Whamcloud - gitweb
b8869badebe81a90e4ce9d4a13c9715b60bd59d7
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
24 #include <linux/mm.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
30
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
33
34 #include <linux/fs.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
39
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <linux/obd_class.h>
43 #include <linux/lustre_lib.h>
44 #include <linux/lustre_net.h>
45 #include <linux/obd_ost.h>
46
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
48 {
49         struct osc_obd *osc = &conn->oc_dev->u.osc;
50         return osc->osc_client;
51 }
52
53 static int osc_connect(struct obd_conn *conn)
54 {
55         struct ptlrpc_request *request;
56         struct ptlrpc_client *cl = osc_con2cl(conn);
57         struct ost_body *body;
58         int rc, size = sizeof(*body);
59         ENTRY;
60
61         request = ptlrpc_prep_req(cl, OST_CONNECT, 0, NULL, NULL);
62         if (!request)
63                 RETURN(-ENOMEM);
64
65         request->rq_replen = lustre_msg_size(1, &size);
66
67         rc = ptlrpc_queue_wait(cl, request);
68         if (rc)
69                 GOTO(out, rc);
70
71         body = lustre_msg_buf(request->rq_repmsg, 0);
72         CDEBUG(D_INODE, "received connid %d\n", body->connid);
73
74         conn->oc_id = body->connid;
75         EXIT;
76  out:
77         ptlrpc_free_req(request);
78         return rc;
79 }
80
81 static int osc_disconnect(struct obd_conn *conn)
82 {
83         struct ptlrpc_request *request;
84         struct ptlrpc_client *cl = osc_con2cl(conn);
85         struct ost_body *body;
86         int rc, size = sizeof(*body);
87         ENTRY;
88
89         request = ptlrpc_prep_req(cl, OST_DISCONNECT, 1, &size, NULL);
90         if (!request)
91                 RETURN(-ENOMEM);
92
93         body = lustre_msg_buf(request->rq_reqmsg, 0);
94         body->connid = conn->oc_id;
95
96         request->rq_replen = lustre_msg_size(1, &size);
97
98         rc = ptlrpc_queue_wait(cl, request);
99         GOTO(out, rc);
100  out:
101         ptlrpc_free_req(request);
102         return rc;
103 }
104
105 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
106 {
107         struct ptlrpc_request *request;
108         struct ptlrpc_client *cl = osc_con2cl(conn);
109         struct ost_body *body;
110         int rc, size = sizeof(*body);
111         ENTRY;
112
113         request = ptlrpc_prep_req(cl, OST_GETATTR, 1, &size, NULL);
114         if (!request)
115                 RETURN(-ENOMEM);
116
117         body = lustre_msg_buf(request->rq_reqmsg, 0);
118         memcpy(&body->oa, oa, sizeof(*oa));
119         body->connid = conn->oc_id;
120         body->oa.o_valid = ~0;
121
122         request->rq_replen = lustre_msg_size(1, &size);
123
124         rc = ptlrpc_queue_wait(cl, request);
125         if (rc)
126                 GOTO(out, rc);
127
128         body = lustre_msg_buf(request->rq_repmsg, 0);
129         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130         if (oa)
131                 memcpy(oa, &body->oa, sizeof(*oa));
132
133         EXIT;
134  out:
135         ptlrpc_free_req(request);
136         return 0;
137 }
138
139 static int osc_open(struct obd_conn *conn, struct obdo *oa)
140 {
141         struct ptlrpc_request *request;
142         struct ptlrpc_client *cl = osc_con2cl(conn);
143         struct ost_body *body;
144         int rc, size = sizeof(*body);
145         ENTRY;
146
147         request = ptlrpc_prep_req(cl, OST_OPEN, 1, &size, NULL);
148         if (!request)
149                 RETURN(-ENOMEM);
150
151         body = lustre_msg_buf(request->rq_reqmsg, 0);
152         memcpy(&body->oa, oa, sizeof(*oa));
153         body->connid = conn->oc_id;
154         if (body->oa.o_valid != (OBD_MD_FLMODE | OBD_MD_FLID))
155                 LBUG();
156
157         request->rq_replen = lustre_msg_size(1, &size);
158
159         rc = ptlrpc_queue_wait(cl, request);
160         if (rc)
161                 GOTO(out, rc);
162
163         body = lustre_msg_buf(request->rq_repmsg, 0);
164         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
165         if (oa)
166                 memcpy(oa, &body->oa, sizeof(*oa));
167
168         EXIT;
169  out:
170         ptlrpc_free_req(request);
171         return 0;
172 }
173
174 static int osc_close(struct obd_conn *conn, struct obdo *oa)
175 {
176         struct ptlrpc_request *request;
177         struct ptlrpc_client *cl = osc_con2cl(conn);
178         struct ost_body *body;
179         int rc, size = sizeof(*body);
180         ENTRY;
181
182         request = ptlrpc_prep_req(cl, OST_CLOSE, 1, &size, NULL);
183         if (!request)
184                 RETURN(-ENOMEM);
185
186         body = lustre_msg_buf(request->rq_reqmsg, 0);
187         memcpy(&body->oa, oa, sizeof(*oa));
188         body->connid = conn->oc_id;
189
190         request->rq_replen = lustre_msg_size(1, &size);
191
192         rc = ptlrpc_queue_wait(cl, request);
193         if (rc)
194                 GOTO(out, rc);
195
196         body = lustre_msg_buf(request->rq_repmsg, 0);
197         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198         if (oa)
199                 memcpy(oa, &body->oa, sizeof(*oa));
200
201         EXIT;
202  out:
203         ptlrpc_free_req(request);
204         return 0;
205 }
206
207 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
208 {
209         struct ptlrpc_request *request;
210         struct ptlrpc_client *cl = osc_con2cl(conn);
211         struct ost_body *body;
212         int rc, size = sizeof(*body);
213         ENTRY;
214
215         request = ptlrpc_prep_req(cl, OST_SETATTR, 1, &size, NULL);
216         if (!request)
217                 RETURN(-ENOMEM);
218
219         body = lustre_msg_buf(request->rq_reqmsg, 0);
220         memcpy(&body->oa, oa, sizeof(*oa));
221         body->connid = conn->oc_id;
222
223         request->rq_replen = lustre_msg_size(1, &size);
224
225         rc = ptlrpc_queue_wait(cl, request);
226         GOTO(out, rc);
227
228  out:
229         ptlrpc_free_req(request);
230         return 0;
231 }
232
233 static int osc_create(struct obd_conn *conn, struct obdo *oa)
234 {
235         struct ptlrpc_request *request;
236         struct ptlrpc_client *cl = osc_con2cl(conn);
237         struct ost_body *body;
238         int rc, size = sizeof(*body);
239         ENTRY;
240
241         if (!oa) {
242                 CERROR("oa NULL\n");
243                 RETURN(-EINVAL);
244         }
245         request = ptlrpc_prep_req(cl, OST_CREATE, 1, &size, NULL);
246         if (!request)
247                 RETURN(-ENOMEM);
248
249         body = lustre_msg_buf(request->rq_reqmsg, 0);
250         memcpy(&body->oa, oa, sizeof(*oa));
251         body->oa.o_valid = ~0;
252         body->connid = conn->oc_id;
253
254         request->rq_replen = lustre_msg_size(1, &size);
255
256         rc = ptlrpc_queue_wait(cl, request);
257         if (rc)
258                 GOTO(out, rc);
259
260         body = lustre_msg_buf(request->rq_repmsg, 0);
261         memcpy(oa, &body->oa, sizeof(*oa));
262
263         EXIT;
264  out:
265         ptlrpc_free_req(request);
266         return 0;
267 }
268
269 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
270                      obd_off offset)
271 {
272         struct ptlrpc_request *request;
273         struct ptlrpc_client *cl = osc_con2cl(conn);
274         struct ost_body *body;
275         int rc, size = sizeof(*body);
276         ENTRY;
277
278         if (!oa) {
279                 CERROR("oa NULL\n");
280                 RETURN(-EINVAL);
281         }
282         request = ptlrpc_prep_req(cl, OST_PUNCH, 1, &size, NULL);
283         if (!request)
284                 RETURN(-ENOMEM);
285
286         body = lustre_msg_buf(request->rq_reqmsg, 0);
287         memcpy(&body->oa, oa, sizeof(*oa));
288         body->connid = conn->oc_id;
289         body->oa.o_valid = ~0;
290         body->oa.o_size = offset;
291         body->oa.o_blocks = count;
292
293         request->rq_replen = lustre_msg_size(1, &size);
294
295         rc = ptlrpc_queue_wait(cl, request);
296         if (rc)
297                 GOTO(out, rc);
298
299         body = lustre_msg_buf(request->rq_repmsg, 0);
300         memcpy(oa, &body->oa, sizeof(*oa));
301
302         EXIT;
303  out:
304         ptlrpc_free_req(request);
305         return 0;
306 }
307
308 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
309 {
310         struct ptlrpc_request *request;
311         struct ptlrpc_client *cl = osc_con2cl(conn);
312         struct ost_body *body;
313         int rc, size = sizeof(*body);
314         ENTRY;
315
316         if (!oa) {
317                 CERROR("oa NULL\n");
318                 RETURN(-EINVAL);
319         }
320         request = ptlrpc_prep_req(cl, OST_DESTROY, 1, &size, NULL);
321         if (!request)
322                 RETURN(-ENOMEM);
323
324         body = lustre_msg_buf(request->rq_reqmsg, 0);
325         memcpy(&body->oa, oa, sizeof(*oa));
326         body->connid = conn->oc_id;
327         body->oa.o_valid = ~0;
328
329         request->rq_replen = lustre_msg_size(1, &size);
330
331         rc = ptlrpc_queue_wait(cl, request);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = lustre_msg_buf(request->rq_repmsg, 0);
336         memcpy(oa, &body->oa, sizeof(*oa));
337
338         EXIT;
339  out:
340         ptlrpc_free_req(request);
341         return 0;
342 }
343
344 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
345                  struct niobuf *dst, struct niobuf *src)
346 {
347         struct ptlrpc_client *cl = osc_con2cl(conn);
348
349         if (cl->cli_obd) {
350                 /* local sendpage */
351                 memcpy((char *)(unsigned long)dst->addr,
352                        (char *)(unsigned long)src->addr, src->len);
353         } else {
354                 struct ptlrpc_bulk_desc *bulk;
355                 int rc;
356
357                 bulk = ptlrpc_prep_bulk(&cl->cli_server);
358                 if (bulk == NULL)
359                         return -ENOMEM;
360
361                 bulk->b_buf = (void *)(unsigned long)src->addr;
362                 bulk->b_buflen = src->len;
363                 bulk->b_xid = dst->xid;
364                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
365                 if (rc != 0) {
366                         CERROR("send_bulk failed: %d\n", rc);
367                         LBUG();
368                         return rc;
369                 }
370                 wait_event_interruptible(bulk->b_waitq,
371                                          ptlrpc_check_bulk_sent(bulk));
372
373                 if (bulk->b_flags == PTL_RPC_INTR) {
374                         EXIT;
375                         /* FIXME: hey hey, we leak here. */
376                         return -EINTR;
377                 }
378
379                 OBD_FREE(bulk, sizeof(*bulk));
380         }
381
382         return 0;
383 }
384
385 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
386                  obd_count *oa_bufs, struct page **buf, obd_size *count,
387                  obd_off *offset, obd_flag *flags)
388 {
389         struct ptlrpc_client *cl = osc_con2cl(conn);
390         struct ptlrpc_request *request;
391         struct ost_body *body;
392         struct obd_ioobj ioo;
393         struct niobuf src;
394         int pages, rc, i, j, size[3] = {sizeof(*body)};
395         void *ptr1, *ptr2;
396         struct ptlrpc_bulk_desc **bulk;
397         ENTRY;
398
399         size[1] = num_oa * sizeof(ioo);
400         pages = 0;
401         for (i = 0; i < num_oa; i++)
402                 pages += oa_bufs[i];
403         size[2] = pages * sizeof(src);
404
405         OBD_ALLOC(bulk, pages * sizeof(*bulk));
406         if (bulk == NULL)
407                 RETURN(-ENOMEM);
408
409         request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
410         if (!request)
411                 GOTO(out, rc = -ENOMEM);
412
413         body = lustre_msg_buf(request->rq_reqmsg, 0);
414         body->data = OBD_BRW_READ;
415
416         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
417         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
418         for (pages = 0, i = 0; i < num_oa; i++) {
419                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
420                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
421                         bulk[pages] = ptlrpc_prep_bulk(&cl->cli_server);
422                         if (bulk[pages] == NULL)
423                                 GOTO(out, rc = -ENOMEM);
424
425                         spin_lock(&cl->cli_lock);
426                         bulk[pages]->b_xid = cl->cli_xid++;
427                         spin_unlock(&cl->cli_lock);
428
429                         bulk[pages]->b_buf = kmap(buf[pages]);
430                         bulk[pages]->b_buflen = PAGE_SIZE;
431                         bulk[pages]->b_portal = OST_BULK_PORTAL;
432                         ost_pack_niobuf(&ptr2, bulk[pages]->b_buf,
433                                         offset[pages], count[pages],
434                                         flags[pages], bulk[pages]->b_xid);
435
436                         rc = ptlrpc_register_bulk(bulk[pages]);
437                         if (rc)
438                                 GOTO(out, rc);
439                 }
440         }
441
442         request->rq_replen = lustre_msg_size(1, size);
443         rc = ptlrpc_queue_wait(cl, request);
444         GOTO(out, rc);
445
446  out:
447         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
448          * abort those bulk listeners. */
449
450         for (pages = 0, i = 0; i < num_oa; i++) {
451                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
452                         if (bulk[pages] == NULL)
453                                 continue;
454                         kunmap(buf[pages]);
455                         OBD_FREE(bulk[pages], sizeof(**bulk));
456                 }
457         }
458
459         OBD_FREE(bulk, pages * sizeof(*bulk));
460         ptlrpc_free_req(request);
461         return rc;
462 }
463
464 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
465                   obd_count *oa_bufs, struct page **buf, obd_size *count,
466                   obd_off *offset, obd_flag *flags)
467 {
468         struct ptlrpc_client *cl = osc_con2cl(conn);
469         struct ptlrpc_request *request;
470         struct obd_ioobj ioo;
471         struct ost_body *body;
472         struct niobuf *src;
473         int pages, rc, i, j, size[3] = {sizeof(*body)};
474         void *ptr1, *ptr2;
475         ENTRY;
476
477         size[1] = num_oa * sizeof(ioo);
478         pages = 0;
479         for (i = 0; i < num_oa; i++)
480                 pages += oa_bufs[i];
481         size[2] = pages * sizeof(*src);
482
483         OBD_ALLOC(src, size[2]);
484         if (!src)
485                 RETURN(-ENOMEM);
486
487         request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL);
488         if (!request)
489                 RETURN(-ENOMEM);
490         body = lustre_msg_buf(request->rq_reqmsg, 0);
491         body->data = OBD_BRW_WRITE;
492
493         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
494         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
495         for (pages = 0, i = 0; i < num_oa; i++) {
496                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
497                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
498                         ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages],
499                                         count[pages], flags[pages], 0);
500                 }
501         }
502         memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]);
503
504         size[1] = pages * sizeof(struct niobuf);
505         request->rq_replen = lustre_msg_size(2, size);
506
507         rc = ptlrpc_queue_wait(cl, request);
508         if (rc)
509                 GOTO(out, rc);
510
511         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
512         if (ptr2 == NULL)
513                 GOTO(out, rc = -EINVAL);
514
515         if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) {
516                 CERROR("buffer length wrong (%d vs. %d)\n",
517                        request->rq_repmsg->buflens[1],
518                        pages * sizeof(struct niobuf));
519                 GOTO(out, rc = -EINVAL);
520         }
521
522         for (pages = 0, i = 0; i < num_oa; i++) {
523                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
524                         struct niobuf *dst;
525                         ost_unpack_niobuf(&ptr2, &dst);
526                         osc_sendpage(conn, request, dst, &src[pages]);
527                 }
528         }
529         OBD_FREE(src, size[2]);
530  out:
531         for (pages = 0, i = 0; i < num_oa; i++)
532                 for (j = 0; j < oa_bufs[i]; j++, pages++)
533                         kunmap(buf[n]);
534
535         ptlrpc_free_req(request);
536         return 0;
537 }
538
539 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
540               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
541               obd_size *count, obd_off *offset, obd_flag *flags)
542 {
543         if (rw == OBD_BRW_READ)
544                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
545                                     offset, flags);
546         else
547                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
548                                      offset, flags);
549 }
550
551 /* mount the file system (secretly) */
552 static int osc_setup(struct obd_device *obddev, obd_count len,
553                         void *buf)
554
555 {
556         struct osc_obd *osc = &obddev->u.osc;
557         struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
558         int rc;
559         int dev = data->ioc_dev;
560         ENTRY;
561
562         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
563         if (osc->osc_client == NULL)
564                 RETURN(-ENOMEM);
565
566         ptlrpc_init_client(dev, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
567                                    osc->osc_client);
568         rc = ptlrpc_connect_client(dev, "ost", osc->osc_client);
569
570         if (rc == 0)
571                 MOD_INC_USE_COUNT;
572         RETURN(rc);
573 }
574
575 static int osc_cleanup(struct obd_device * obddev)
576 {
577         struct osc_obd *osc = &obddev->u.osc;
578
579         if (osc->osc_client != NULL)
580                 OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
581
582         MOD_DEC_USE_COUNT;
583         return 0;
584 }
585
586 struct obd_ops osc_obd_ops = {
587         o_setup:   osc_setup,
588         o_cleanup: osc_cleanup,
589         o_create: osc_create,
590         o_destroy: osc_destroy,
591         o_getattr: osc_getattr,
592         o_setattr: osc_setattr,
593         o_open: osc_open,
594         o_close: osc_close,
595         o_connect: osc_connect,
596         o_disconnect: osc_disconnect,
597         o_brw: osc_brw,
598         o_punch: osc_punch
599 };
600
601 static int __init osc_init(void)
602 {
603         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
604         return 0;
605 }
606
607 static void __exit osc_exit(void)
608 {
609         obd_unregister_type(LUSTRE_OSC_NAME);
610 }
611
612 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
613 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
614 MODULE_LICENSE("GPL");
615
616 module_init(osc_init);
617 module_exit(osc_exit);