Whamcloud - gitweb
No new functionality outside of the DLM. ptlrpc_client no longer contains
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
24 #include <linux/mm.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
30
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
33
34 #include <linux/fs.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
39
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <linux/obd_class.h>
43 #include <linux/lustre_lib.h>
44 #include <linux/lustre_net.h>
45 #include <linux/obd_ost.h>
46
47 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
48                        struct lustre_peer **peer)
49 {
50         struct osc_obd *osc = &conn->oc_dev->u.osc;
51         *cl = osc->osc_client;
52         *peer = &osc->osc_peer;
53 }
54
55 static int osc_connect(struct obd_conn *conn)
56 {
57         struct ptlrpc_request *request;
58         struct ptlrpc_client *cl;
59         struct lustre_peer *peer;
60         struct ost_body *body;
61         int rc, size = sizeof(*body);
62         ENTRY;
63
64         osc_con2cl(conn, &cl, &peer);
65         request = ptlrpc_prep_req(cl, peer, OST_CONNECT, 0, NULL, NULL);
66         if (!request)
67                 RETURN(-ENOMEM);
68
69         request->rq_replen = lustre_msg_size(1, &size);
70
71         rc = ptlrpc_queue_wait(cl, request);
72         if (rc)
73                 GOTO(out, rc);
74
75         body = lustre_msg_buf(request->rq_repmsg, 0);
76         CDEBUG(D_INODE, "received connid %d\n", body->connid);
77
78         conn->oc_id = body->connid;
79         EXIT;
80  out:
81         ptlrpc_free_req(request);
82         return rc;
83 }
84
85 static int osc_disconnect(struct obd_conn *conn)
86 {
87         struct ptlrpc_request *request;
88         struct ptlrpc_client *cl;
89         struct lustre_peer *peer;
90         struct ost_body *body;
91         int rc, size = sizeof(*body);
92         ENTRY;
93
94         osc_con2cl(conn, &cl, &peer);
95         request = ptlrpc_prep_req(cl, peer, OST_DISCONNECT, 1, &size, NULL);
96         if (!request)
97                 RETURN(-ENOMEM);
98
99         body = lustre_msg_buf(request->rq_reqmsg, 0);
100         body->connid = conn->oc_id;
101
102         request->rq_replen = lustre_msg_size(1, &size);
103
104         rc = ptlrpc_queue_wait(cl, request);
105         GOTO(out, rc);
106  out:
107         ptlrpc_free_req(request);
108         return rc;
109 }
110
111 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
112 {
113         struct ptlrpc_request *request;
114         struct ptlrpc_client *cl;
115         struct lustre_peer *peer;
116         struct ost_body *body;
117         int rc, size = sizeof(*body);
118         ENTRY;
119
120         osc_con2cl(conn, &cl, &peer);
121         request = ptlrpc_prep_req(cl, peer, OST_GETATTR, 1, &size, NULL);
122         if (!request)
123                 RETURN(-ENOMEM);
124
125         body = lustre_msg_buf(request->rq_reqmsg, 0);
126         memcpy(&body->oa, oa, sizeof(*oa));
127         body->connid = conn->oc_id;
128         body->oa.o_valid = ~0;
129
130         request->rq_replen = lustre_msg_size(1, &size);
131
132         rc = ptlrpc_queue_wait(cl, request);
133         if (rc)
134                 GOTO(out, rc);
135
136         body = lustre_msg_buf(request->rq_repmsg, 0);
137         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
138         if (oa)
139                 memcpy(oa, &body->oa, sizeof(*oa));
140
141         EXIT;
142  out:
143         ptlrpc_free_req(request);
144         return 0;
145 }
146
147 static int osc_open(struct obd_conn *conn, struct obdo *oa)
148 {
149         struct ptlrpc_request *request;
150         struct ptlrpc_client *cl;
151         struct lustre_peer *peer;
152         struct ost_body *body;
153         int rc, size = sizeof(*body);
154         ENTRY;
155
156         osc_con2cl(conn, &cl, &peer);
157         request = ptlrpc_prep_req(cl, peer, OST_OPEN, 1, &size, NULL);
158         if (!request)
159                 RETURN(-ENOMEM);
160
161         body = lustre_msg_buf(request->rq_reqmsg, 0);
162         memcpy(&body->oa, oa, sizeof(*oa));
163         body->connid = conn->oc_id;
164         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
165                 LBUG();
166
167         request->rq_replen = lustre_msg_size(1, &size);
168
169         rc = ptlrpc_queue_wait(cl, request);
170         if (rc)
171                 GOTO(out, rc);
172
173         body = lustre_msg_buf(request->rq_repmsg, 0);
174         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
175         if (oa)
176                 memcpy(oa, &body->oa, sizeof(*oa));
177
178         EXIT;
179  out:
180         ptlrpc_free_req(request);
181         return 0;
182 }
183
184 static int osc_close(struct obd_conn *conn, struct obdo *oa)
185 {
186         struct ptlrpc_request *request;
187         struct ptlrpc_client *cl;
188         struct lustre_peer *peer;
189         struct ost_body *body;
190         int rc, size = sizeof(*body);
191         ENTRY;
192
193         osc_con2cl(conn, &cl, &peer);
194         request = ptlrpc_prep_req(cl, peer, OST_CLOSE, 1, &size, NULL);
195         if (!request)
196                 RETURN(-ENOMEM);
197
198         body = lustre_msg_buf(request->rq_reqmsg, 0);
199         memcpy(&body->oa, oa, sizeof(*oa));
200         body->connid = conn->oc_id;
201
202         request->rq_replen = lustre_msg_size(1, &size);
203
204         rc = ptlrpc_queue_wait(cl, request);
205         if (rc)
206                 GOTO(out, rc);
207
208         body = lustre_msg_buf(request->rq_repmsg, 0);
209         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
210         if (oa)
211                 memcpy(oa, &body->oa, sizeof(*oa));
212
213         EXIT;
214  out:
215         ptlrpc_free_req(request);
216         return 0;
217 }
218
219 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
220 {
221         struct ptlrpc_request *request;
222         struct ptlrpc_client *cl;
223         struct lustre_peer *peer;
224         struct ost_body *body;
225         int rc, size = sizeof(*body);
226         ENTRY;
227
228         osc_con2cl(conn, &cl, &peer);
229         request = ptlrpc_prep_req(cl, peer, OST_SETATTR, 1, &size, NULL);
230         if (!request)
231                 RETURN(-ENOMEM);
232
233         body = lustre_msg_buf(request->rq_reqmsg, 0);
234         memcpy(&body->oa, oa, sizeof(*oa));
235         body->connid = conn->oc_id;
236
237         request->rq_replen = lustre_msg_size(1, &size);
238
239         rc = ptlrpc_queue_wait(cl, request);
240         GOTO(out, rc);
241
242  out:
243         ptlrpc_free_req(request);
244         return 0;
245 }
246
247 static int osc_create(struct obd_conn *conn, struct obdo *oa)
248 {
249         struct ptlrpc_request *request;
250         struct ptlrpc_client *cl;
251         struct lustre_peer *peer;
252         struct ost_body *body;
253         int rc, size = sizeof(*body);
254         ENTRY;
255
256         if (!oa) {
257                 CERROR("oa NULL\n");
258                 RETURN(-EINVAL);
259         }
260         osc_con2cl(conn, &cl, &peer);
261         request = ptlrpc_prep_req(cl, peer, OST_CREATE, 1, &size, NULL);
262         if (!request)
263                 RETURN(-ENOMEM);
264
265         body = lustre_msg_buf(request->rq_reqmsg, 0);
266         memcpy(&body->oa, oa, sizeof(*oa));
267         body->oa.o_valid = ~0;
268         body->connid = conn->oc_id;
269
270         request->rq_replen = lustre_msg_size(1, &size);
271
272         rc = ptlrpc_queue_wait(cl, request);
273         if (rc)
274                 GOTO(out, rc);
275
276         body = lustre_msg_buf(request->rq_repmsg, 0);
277         memcpy(oa, &body->oa, sizeof(*oa));
278
279         EXIT;
280  out:
281         ptlrpc_free_req(request);
282         return 0;
283 }
284
285 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
286                      obd_off offset)
287 {
288         struct ptlrpc_request *request;
289         struct ptlrpc_client *cl;
290         struct lustre_peer *peer;
291         struct ost_body *body;
292         int rc, size = sizeof(*body);
293         ENTRY;
294
295         if (!oa) {
296                 CERROR("oa NULL\n");
297                 RETURN(-EINVAL);
298         }
299         osc_con2cl(conn, &cl, &peer);
300         request = ptlrpc_prep_req(cl, peer, OST_PUNCH, 1, &size, NULL);
301         if (!request)
302                 RETURN(-ENOMEM);
303
304         body = lustre_msg_buf(request->rq_reqmsg, 0);
305         memcpy(&body->oa, oa, sizeof(*oa));
306         body->connid = conn->oc_id;
307         body->oa.o_valid = ~0;
308         body->oa.o_size = offset;
309         body->oa.o_blocks = count;
310
311         request->rq_replen = lustre_msg_size(1, &size);
312
313         rc = ptlrpc_queue_wait(cl, request);
314         if (rc)
315                 GOTO(out, rc);
316
317         body = lustre_msg_buf(request->rq_repmsg, 0);
318         memcpy(oa, &body->oa, sizeof(*oa));
319
320         EXIT;
321  out:
322         ptlrpc_free_req(request);
323         return 0;
324 }
325
326 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
327 {
328         struct ptlrpc_request *request;
329         struct ptlrpc_client *cl;
330         struct lustre_peer *peer;
331         struct ost_body *body;
332         int rc, size = sizeof(*body);
333         ENTRY;
334
335         if (!oa) {
336                 CERROR("oa NULL\n");
337                 RETURN(-EINVAL);
338         }
339         osc_con2cl(conn, &cl, &peer);
340         request = ptlrpc_prep_req(cl, peer, OST_DESTROY, 1, &size, NULL);
341         if (!request)
342                 RETURN(-ENOMEM);
343
344         body = lustre_msg_buf(request->rq_reqmsg, 0);
345         memcpy(&body->oa, oa, sizeof(*oa));
346         body->connid = conn->oc_id;
347         body->oa.o_valid = ~0;
348
349         request->rq_replen = lustre_msg_size(1, &size);
350
351         rc = ptlrpc_queue_wait(cl, request);
352         if (rc)
353                 GOTO(out, rc);
354
355         body = lustre_msg_buf(request->rq_repmsg, 0);
356         memcpy(oa, &body->oa, sizeof(*oa));
357
358         EXIT;
359  out:
360         ptlrpc_free_req(request);
361         return 0;
362 }
363
364 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
365                  struct niobuf *dst, struct niobuf *src)
366 {
367         struct ptlrpc_client *cl;
368         struct lustre_peer *peer;
369
370         osc_con2cl(conn, &cl, &peer);
371
372         if (cl->cli_obd) {
373                 /* local sendpage */
374                 memcpy((char *)(unsigned long)dst->addr,
375                        (char *)(unsigned long)src->addr, src->len);
376         } else {
377                 struct ptlrpc_bulk_desc *bulk;
378                 int rc;
379
380                 bulk = ptlrpc_prep_bulk(peer);
381                 if (bulk == NULL)
382                         return -ENOMEM;
383
384                 bulk->b_buf = (void *)(unsigned long)src->addr;
385                 bulk->b_buflen = src->len;
386                 bulk->b_xid = dst->xid;
387                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
388                 if (rc != 0) {
389                         CERROR("send_bulk failed: %d\n", rc);
390                         LBUG();
391                         return rc;
392                 }
393                 wait_event_interruptible(bulk->b_waitq,
394                                          ptlrpc_check_bulk_sent(bulk));
395
396                 if (bulk->b_flags == PTL_RPC_INTR) {
397                         EXIT;
398                         /* FIXME: hey hey, we leak here. */
399                         return -EINTR;
400                 }
401
402                 OBD_FREE(bulk, sizeof(*bulk));
403         }
404
405         return 0;
406 }
407
408 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
409                  obd_count *oa_bufs, struct page **buf, obd_size *count,
410                  obd_off *offset, obd_flag *flags)
411 {
412         struct ptlrpc_client *cl;
413         struct lustre_peer *peer;
414         struct ptlrpc_request *request;
415         struct ost_body *body;
416         struct obd_ioobj ioo;
417         struct niobuf src;
418         int pages, rc, i, j, size[3] = {sizeof(*body)};
419         void *ptr1, *ptr2;
420         struct ptlrpc_bulk_desc **bulk;
421         ENTRY;
422
423         size[1] = num_oa * sizeof(ioo);
424         pages = 0;
425         for (i = 0; i < num_oa; i++)
426                 pages += oa_bufs[i];
427         size[2] = pages * sizeof(src);
428
429         OBD_ALLOC(bulk, pages * sizeof(*bulk));
430         if (bulk == NULL)
431                 RETURN(-ENOMEM);
432
433         osc_con2cl(conn, &cl, &peer);
434         request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
435         if (!request)
436                 GOTO(out, rc = -ENOMEM);
437
438         body = lustre_msg_buf(request->rq_reqmsg, 0);
439         body->data = OBD_BRW_READ;
440
441         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
442         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
443         for (pages = 0, i = 0; i < num_oa; i++) {
444                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
445                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
446                         bulk[pages] = ptlrpc_prep_bulk(peer);
447                         if (bulk[pages] == NULL)
448                                 GOTO(out, rc = -ENOMEM);
449
450                         spin_lock(&cl->cli_lock);
451                         bulk[pages]->b_xid = cl->cli_xid++;
452                         spin_unlock(&cl->cli_lock);
453
454                         bulk[pages]->b_buf = kmap(buf[pages]);
455                         bulk[pages]->b_buflen = PAGE_SIZE;
456                         bulk[pages]->b_portal = OST_BULK_PORTAL;
457                         ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
458                                         offset[pages], count[pages],
459                                         flags[pages], bulk[pages]->b_xid);
460
461                         rc = ptlrpc_register_bulk(bulk[pages]);
462                         if (rc)
463                                 GOTO(out, rc);
464                 }
465         }
466
467         request->rq_replen = lustre_msg_size(1, size);
468         rc = ptlrpc_queue_wait(cl, request);
469         GOTO(out, rc);
470
471  out:
472         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
473          * abort those bulk listeners. */
474
475         for (pages = 0, i = 0; i < num_oa; i++) {
476                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
477                         if (bulk[pages] == NULL)
478                                 continue;
479                         kunmap(buf[pages]);
480                         OBD_FREE(bulk[pages], sizeof(**bulk));
481                 }
482         }
483
484         OBD_FREE(bulk, pages * sizeof(*bulk));
485         ptlrpc_free_req(request);
486         return rc;
487 }
488
489 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
490                   obd_count *oa_bufs, struct page **buf, obd_size *count,
491                   obd_off *offset, obd_flag *flags)
492 {
493         struct ptlrpc_client *cl;
494         struct lustre_peer *peer;
495         struct ptlrpc_request *request;
496         struct obd_ioobj ioo;
497         struct ost_body *body;
498         struct niobuf *src;
499         int pages, rc, i, j, size[3] = {sizeof(*body)};
500         void *ptr1, *ptr2;
501         ENTRY;
502
503         size[1] = num_oa * sizeof(ioo);
504         pages = 0;
505         for (i = 0; i < num_oa; i++)
506                 pages += oa_bufs[i];
507         size[2] = pages * sizeof(*src);
508
509         OBD_ALLOC(src, size[2]);
510         if (!src)
511                 RETURN(-ENOMEM);
512
513         osc_con2cl(conn, &cl, &peer);
514         request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL);
515         if (!request)
516                 RETURN(-ENOMEM);
517         body = lustre_msg_buf(request->rq_reqmsg, 0);
518         body->data = OBD_BRW_WRITE;
519
520         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
521         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
522         for (pages = 0, i = 0; i < num_oa; i++) {
523                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
524                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
525                         ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
526                                         count[pages], flags[pages], 0);
527                 }
528         }
529         memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
530
531         size[1] = pages * sizeof(struct niobuf);
532         request->rq_replen = lustre_msg_size(2, size);
533
534         rc = ptlrpc_queue_wait(cl, request);
535         if (rc)
536                 GOTO(out, rc);
537
538         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
539         if (ptr2 == NULL)
540                 GOTO(out, rc = -EINVAL);
541
542         if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
543                 CERROR("buffer length wrong (%d vs. %d)\n",
544                        request->rq_repmsg->buflens[1],
545                        pages * sizeof(struct niobuf));
546                 GOTO(out, rc = -EINVAL);
547         }
548
549         for (pages = 0, i = 0; i < num_oa; i++) {
550                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
551                         struct niobuf *dst;
552                         ost_unpack_niobuf(&ptr2, &dst);
553                         osc_sendpage(conn, request, dst, &src[pages]);
554                 }
555         }
556         OBD_FREE(src, size[2]);
557  out:
558         for (pages = 0, i = 0; i < num_oa; i++)
559                 for (j = 0; j < oa_bufs[i]; j++, pages++)
560                         kunmap(buf[pages]);
561
562         ptlrpc_free_req(request);
563         return 0;
564 }
565
566 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
567               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
568               obd_size *count, obd_off *offset, obd_flag *flags)
569 {
570         if (rw == OBD_BRW_READ)
571                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
572                                     offset, flags);
573         else
574                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
575                                      offset, flags);
576 }
577
578 /* mount the file system (secretly) */
579 static int osc_setup(struct obd_device *obddev, obd_count len,
580                         void *buf)
581
582 {
583         struct osc_obd *osc = &obddev->u.osc;
584         struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
585         int rc;
586         int dev = data->ioc_dev;
587         ENTRY;
588
589         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
590         if (osc->osc_client == NULL)
591                 RETURN(-ENOMEM);
592
593         ptlrpc_init_client(dev, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
594                                    osc->osc_client);
595         rc = ptlrpc_connect_client("ost", osc->osc_client, &osc->osc_peer);
596
597         if (rc == 0)
598                 MOD_INC_USE_COUNT;
599         RETURN(rc);
600 }
601
602 static int osc_cleanup(struct obd_device * obddev)
603 {
604         struct osc_obd *osc = &obddev->u.osc;
605
606         if (osc->osc_client != NULL)
607                 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
608
609         MOD_DEC_USE_COUNT;
610         return 0;
611 }
612
613 struct obd_ops osc_obd_ops = {
614         o_setup:   osc_setup,
615         o_cleanup: osc_cleanup,
616         o_create: osc_create,
617         o_destroy: osc_destroy,
618         o_getattr: osc_getattr,
619         o_setattr: osc_setattr,
620         o_open: osc_open,
621         o_close: osc_close,
622         o_connect: osc_connect,
623         o_disconnect: osc_disconnect,
624         o_brw: osc_brw,
625         o_punch: osc_punch
626 };
627
628 static int __init osc_init(void)
629 {
630         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
631         return 0;
632 }
633
634 static void __exit osc_exit(void)
635 {
636         obd_unregister_type(LUSTRE_OSC_NAME);
637 }
638
639 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
640 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
641 MODULE_LICENSE("GPL");
642
643 module_init(osc_init);
644 module_exit(osc_exit);