Whamcloud - gitweb
Tiny cleanup in preprw to match commitrw.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  * 
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  * 
17  */
18
19 #define EXPORT_SYMTAB
20
21 #include <linux/config.h>
22 #include <linux/module.h>
23 #include <linux/kernel.h>
24 #include <linux/mm.h>
25 #include <linux/string.h>
26 #include <linux/stat.h>
27 #include <linux/errno.h>
28 #include <linux/locks.h>
29 #include <linux/unistd.h>
30
31 #include <asm/system.h>
32 #include <asm/uaccess.h>
33
34 #include <linux/fs.h>
35 #include <linux/stat.h>
36 #include <asm/uaccess.h>
37 #include <asm/segment.h>
38 #include <linux/miscdevice.h>
39
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <linux/obd_support.h>
43 #include <linux/obd_class.h>
44 #include <linux/lustre_lib.h>
45 #include <linux/lustre_idl.h>
46
47 struct ptlrpc_client *osc_con2cl(struct obd_conn *conn)
48 {
49         struct osc_obd *osc = &conn->oc_dev->u.osc;
50         return &osc->osc_peer;
51
52 }
53
54 static int osc_connect(struct obd_conn *conn)
55 {
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *peer = osc_con2cl(conn);
58         int rc; 
59         ENTRY;
60         
61         request = ptlrpc_prep_req(peer, OST_CONNECT, 0, NULL, 0, NULL);
62         if (!request) { 
63                 CERROR("cannot pack req!\n"); 
64                 return -ENOMEM;
65         }
66
67         request->rq_replen = 
68                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
69
70         rc = ptlrpc_queue_wait(peer, request);
71         if (rc) { 
72                 EXIT;
73                 goto out;
74         }
75       
76         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
77
78         conn->oc_id = request->rq_rep.ost->connid;
79  out:
80         ptlrpc_free_req(request);
81         EXIT;
82         return rc;
83 }
84
85 static int osc_disconnect(struct obd_conn *conn)
86 {
87         struct ptlrpc_request *request;
88         struct ptlrpc_client *peer = osc_con2cl(conn);
89         int rc; 
90         ENTRY;
91         
92         request = ptlrpc_prep_req(peer, OST_DISCONNECT, 0, NULL, 0, NULL);
93         if (!request) { 
94                 CERROR("cannot pack req!\n"); 
95                 return -ENOMEM;
96         }
97         request->rq_req.ost->connid = conn->oc_id;
98         request->rq_replen = 
99                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
100
101         rc = ptlrpc_queue_wait(peer, request);
102         if (rc) { 
103                 EXIT;
104                 goto out;
105         }
106  out:
107         ptlrpc_free_req(request);
108         EXIT;
109         return rc;
110 }
111
112
113 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
114 {
115         struct ptlrpc_request *request;
116         struct ptlrpc_client *peer = osc_con2cl(conn);
117         int rc; 
118
119         request = ptlrpc_prep_req(peer, OST_GETATTR, 0, NULL, 0, NULL);
120         if (!request) { 
121                 CERROR("cannot pack req!\n"); 
122                 return -ENOMEM;
123         }
124         
125         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
126         request->rq_req.ost->oa.o_valid = ~0;
127         request->rq_replen = 
128                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
129         
130         rc = ptlrpc_queue_wait(peer, request);
131         if (rc) { 
132                 EXIT;
133                 goto out;
134         }
135
136         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
137         if (oa) { 
138                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
139         }
140
141  out:
142         ptlrpc_free_req(request);
143         return 0;
144 }
145
146 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
147 {
148         struct ptlrpc_request *request;
149         struct ptlrpc_client *peer = osc_con2cl(conn);
150         int rc; 
151
152         request = ptlrpc_prep_req(peer, OST_SETATTR, 0, NULL, 0, NULL);
153         if (!request) { 
154                 CERROR("cannot pack req!\n"); 
155                 return -ENOMEM;
156         }
157         
158         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
159         request->rq_replen = 
160                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
161         
162         rc = ptlrpc_queue_wait(peer, request);
163         if (rc) { 
164                 EXIT;
165                 goto out;
166         }
167
168  out:
169         ptlrpc_free_req(request);
170         return 0;
171 }
172
173 static int osc_create(struct obd_conn *conn, struct obdo *oa)
174 {
175         struct ptlrpc_request *request;
176         struct ptlrpc_client *peer = osc_con2cl(conn);
177         int rc; 
178
179         if (!oa) { 
180                 CERROR("oa NULL\n"); 
181         }
182         request = ptlrpc_prep_req(peer, OST_CREATE, 0, NULL, 0, NULL);
183         if (!request) { 
184                 CERROR("cannot pack req!\n"); 
185                 return -ENOMEM;
186         }
187         
188         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
189         request->rq_req.ost->connid = conn->oc_id;
190         request->rq_req.ost->oa.o_valid = ~0;
191         request->rq_replen = 
192                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
193         
194         rc = ptlrpc_queue_wait(peer, request);
195         if (rc) { 
196                 EXIT;
197                 goto out;
198         }
199         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
200
201  out:
202         ptlrpc_free_req(request);
203         return 0;
204 }
205
206 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
207                      obd_off offset)
208 {
209         struct ptlrpc_request *request;
210         struct ptlrpc_client *peer = osc_con2cl(conn);
211         int rc; 
212
213         if (!oa) { 
214                 CERROR("oa NULL\n"); 
215         }
216         request = ptlrpc_prep_req(peer, OST_PUNCH, 0, NULL, 0, NULL);
217         if (!request) { 
218                 CERROR("cannot pack req!\n"); 
219                 return -ENOMEM;
220         }
221         
222         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
223         request->rq_req.ost->oa.o_valid = ~0;
224         request->rq_req.ost->oa.o_size = offset;
225         request->rq_req.ost->oa.o_blocks = count;
226         request->rq_replen = 
227                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
228         
229         rc = ptlrpc_queue_wait(peer, request);
230         if (rc) { 
231                 EXIT;
232                 goto out;
233         }
234         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
235
236  out:
237         ptlrpc_free_req(request);
238         return 0;
239 }
240
241 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
242 {
243         struct ptlrpc_request *request;
244         struct ptlrpc_client *peer = osc_con2cl(conn);
245         int rc; 
246
247         if (!oa) { 
248                 CERROR("oa NULL\n"); 
249         }
250         request = ptlrpc_prep_req(peer, OST_DESTROY, 0, NULL, 0, NULL);
251         if (!request) { 
252                 CERROR("cannot pack req!\n"); 
253                 return -ENOMEM;
254         }
255         
256         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
257         request->rq_req.ost->oa.o_valid = ~0;
258         request->rq_replen = 
259                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
260         
261         rc = ptlrpc_queue_wait(peer, request);
262         if (rc) { 
263                 EXIT;
264                 goto out;
265         }
266         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
267
268  out:
269         ptlrpc_free_req(request);
270         return 0;
271 }
272
273 int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req,
274                  struct niobuf *dst, struct niobuf *src)
275 {
276         struct ptlrpc_client *cl = osc_con2cl(conn);
277
278         if (cl->cli_obd) {
279                 /* local sendpage */
280                 memcpy((char *)(unsigned long)dst->addr,
281                        (char *)(unsigned long)src->addr, src->len);
282         } else {
283                 struct ptlrpc_bulk_desc *bulk;
284                 int rc;
285
286                 bulk = ptlrpc_prep_bulk(&cl->cli_server);
287                 if (bulk == NULL)
288                         return -ENOMEM;
289
290                 bulk->b_buf = (void *)(unsigned long)src->addr;
291                 bulk->b_buflen = src->len;
292                 bulk->b_xid = dst->xid;
293                 rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL);
294                 if (rc != 0) {
295                         CERROR("send_bulk failed: %d\n", rc);
296                         BUG();
297                         return rc;
298                 }
299                 wait_event_interruptible(bulk->b_waitq,
300                                          ptlrpc_check_bulk_sent(bulk));
301
302                 if (bulk->b_flags == PTL_RPC_INTR) {
303                         EXIT;
304                         /* FIXME: hey hey, we leak here. */
305                         return -EINTR;
306                 }
307
308                 OBD_FREE(bulk, sizeof(*bulk));
309         }
310
311         return 0;
312 }
313
314 int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
315                  obd_count *oa_bufs, struct page **buf, obd_size *count,
316                  obd_off *offset, obd_flag *flags)
317 {
318         struct ptlrpc_client *cl = osc_con2cl(conn);
319         struct ptlrpc_request *request;
320         int pages;
321         int rc; 
322         struct obd_ioobj ioo;
323         struct niobuf src;
324         int size1, size2 = 0; 
325         void *ptr1, *ptr2;
326         int i, j, n;
327         struct ptlrpc_bulk_desc **bulk;
328
329         size1 = num_oa * sizeof(ioo); 
330         pages = 0;
331         for (i = 0; i < num_oa; i++)
332                 pages += oa_bufs[i];
333         size2 = pages * sizeof(src);
334
335         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
336         if (!request) { 
337                 CERROR("cannot pack req!\n"); 
338                 return -ENOMEM;
339         }
340         request->rq_req.ost->cmd = OBD_BRW_READ;
341
342         OBD_ALLOC(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
343         if (bulk == NULL) {
344                 CERROR("cannot alloc bulk desc vector\n");
345                 return -ENOMEM;
346         }
347         memset(bulk, 0, pages * sizeof(struct ptlrpc_bulk_desc *));
348
349         n = 0;
350         ptr1 = ost_req_buf1(request->rq_req.ost);
351         ptr2 = ost_req_buf2(request->rq_req.ost);
352         for (i = 0; i < num_oa; i++) {
353                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
354                 for (j = 0; j < oa_bufs[i]; j++) {
355                         bulk[n] = ptlrpc_prep_bulk(&cl->cli_server);
356                         if (bulk[n] == NULL) {
357                                 CERROR("cannot alloc bulk desc\n");
358                                 rc = -ENOMEM;
359                                 goto out;
360                         }
361
362                         spin_lock(&cl->cli_lock);
363                         bulk[n]->b_xid = cl->cli_xid++;
364                         spin_unlock(&cl->cli_lock);
365                         bulk[n]->b_buf = kmap(buf[n]);
366                         bulk[n]->b_buflen = PAGE_SIZE;
367                         bulk[n]->b_portal = OST_BULK_PORTAL;
368                         ost_pack_niobuf(&ptr2, bulk[n]->b_buf, offset[n],
369                                         count[n], flags[n], bulk[n]->b_xid);
370
371                         rc = ptlrpc_register_bulk(bulk[n]);
372                         if (rc)
373                                 goto out;
374                         n++;
375                 }
376         }
377
378         request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
379         rc = ptlrpc_queue_wait(cl, request);
380
381  out:
382         /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to
383          * abort those bulk listeners. */
384
385         n = 0;
386         for (i = 0; i < num_oa; i++) {
387                 for (j = 0; j < oa_bufs[i]; j++) {
388                         if (bulk[n] == NULL)
389                                 continue;
390                         kunmap(buf[n]);
391                         OBD_FREE(bulk[n], sizeof(struct ptlrpc_bulk_desc));
392                         n++;
393                 }
394         }
395
396         OBD_FREE(bulk, pages * sizeof(struct ptlrpc_bulk_desc *));
397         ptlrpc_free_req(request);
398         return rc;
399 }
400
401 int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa,
402                   obd_count *oa_bufs, struct page **buf, obd_size *count,
403                   obd_off *offset, obd_flag *flags)
404 {
405         struct ptlrpc_client *cl = osc_con2cl(conn);
406         struct ptlrpc_request *request;
407         struct obd_ioobj ioo;
408         struct niobuf *src;
409         int pages, rc, i, j, n, size1, size2 = 0; 
410         void *ptr1, *ptr2;
411
412         size1 = num_oa * sizeof(ioo); 
413         pages = 0;
414         for (i = 0; i < num_oa; i++)
415                 pages += oa_bufs[i];
416         size2 = pages * sizeof(*src);
417
418         OBD_ALLOC(src, size2);
419         if (!src) { 
420                 CERROR("no src memory\n");
421                 return -ENOMEM;
422         }
423         memset((char *)src, 0, size2);
424
425         request = ptlrpc_prep_req(cl, OST_BRW, size1, NULL, size2, NULL);
426         if (!request) { 
427                 CERROR("cannot pack req!\n"); 
428                 return -ENOMEM;
429         }
430         request->rq_req.ost->cmd = OBD_BRW_WRITE;
431
432         n = 0;
433         ptr1 = ost_req_buf1(request->rq_req.ost);
434         ptr2 = ost_req_buf2(request->rq_req.ost);
435         for (i = 0; i < num_oa; i++) {
436                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
437                 for (j = 0; j < oa_bufs[i]; j++) {
438                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
439                                         count[n], flags[n], 0);
440                         n++;
441                 }
442         }
443         memcpy((char *)src, (char *)ost_req_buf2(request->rq_req.ost), size2); 
444
445         request->rq_replen = sizeof(struct ptlrep_hdr) +
446                 sizeof(struct ost_rep) + pages * sizeof(struct niobuf);
447         rc = ptlrpc_queue_wait(cl, request);
448         if (rc) { 
449                 EXIT;
450                 goto out;
451         }
452
453         ptr2 = ost_rep_buf2(request->rq_rep.ost);
454         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) {
455                 CERROR("buffer length wrong (%d vs. %d)\n",
456                        request->rq_rep.ost->buflen2, n * sizeof(struct niobuf));
457                 EXIT;
458                 goto out;
459         }
460
461         n = 0;
462         for (i = 0; i < num_oa; i++) {
463                 for (j = 0; j < oa_bufs[i]; j++) {
464                         struct niobuf *dst;
465                         ost_unpack_niobuf(&ptr2, &dst);
466                         osc_sendpage(conn, request, dst, &src[n]);
467                         n++;
468                 }
469         }
470         OBD_FREE(src, size2);
471  out:
472         n = 0;
473         for (i = 0; i < num_oa; i++) {
474                 for (j = 0; j < oa_bufs[i]; j++) {
475                         kunmap(buf[n]);
476                         n++;
477                 }
478         }
479
480         ptlrpc_free_req(request);
481         return 0;
482 }
483
484 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
485               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
486               obd_size *count, obd_off *offset, obd_flag *flags)
487 {
488         if (rw == OBD_BRW_READ) {
489                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
490                                     offset, flags);
491         } else {
492                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
493                                      offset, flags);
494         }
495 }
496
497 /* mount the file system (secretly) */
498 static int osc_setup(struct obd_device *obddev, obd_count len,
499                         void *buf)
500                         
501 {
502         struct osc_obd *osc = &obddev->u.osc;
503         struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
504         int rc;
505         int dev = data->ioc_dev;
506         ENTRY;
507
508         rc = ptlrpc_connect_client(dev, "ost", 
509                                    OST_REQUEST_PORTAL, 
510                                    OSC_REPLY_PORTAL,    
511                                    ost_pack_req, 
512                                    ost_unpack_rep,
513                                    &osc->osc_peer); 
514
515         MOD_INC_USE_COUNT;
516         EXIT;
517         return rc;
518
519
520 static int osc_cleanup(struct obd_device * obddev)
521 {
522         MOD_DEC_USE_COUNT;
523         return 0;
524 }
525
526 struct obd_ops osc_obd_ops = { 
527         o_setup:   osc_setup,
528         o_cleanup: osc_cleanup, 
529         o_create: osc_create,
530         o_destroy: osc_destroy,
531         o_getattr: osc_getattr,
532         o_setattr: osc_setattr,
533         o_connect: osc_connect,
534         o_disconnect: osc_disconnect,
535         o_brw: osc_brw,
536         o_punch: osc_punch
537 };
538
539 static int __init osc_init(void)
540 {
541         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
542         return 0;
543 }
544
545 static void __exit osc_exit(void)
546 {
547         obd_unregister_type(LUSTRE_OSC_NAME);
548 }
549
550 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
551 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
552 MODULE_LICENSE("GPL"); 
553
554 module_init(osc_init);
555 module_exit(osc_exit);