Whamcloud - gitweb
bbb2f96c653573cfec9bb2a8c12bf3004ceff1b8
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* 
2  * Copryright (C) 2001 Cluster File Systems, Inc.
3  *
4  *  This code is issued under the GNU General Public License.
5  *  See the file COPYING in this distribution
6  *
7  *  Author Peter Braam <braam@clusterfs.com>
8  * 
9  *  This server is single threaded at present (but can easily be multi
10  *  threaded). For testing and management it is treated as an
11  *  obd_device, although it does not export a full OBD method table
12  *  (the requests are coming in over the wire, so object target
13  *  modules do not have a full method table.)
14  * 
15  */
16
17 #define EXPORT_SYMTAB
18
19 #include <linux/config.h>
20 #include <linux/module.h>
21 #include <linux/kernel.h>
22 #include <linux/mm.h>
23 #include <linux/string.h>
24 #include <linux/stat.h>
25 #include <linux/errno.h>
26 #include <linux/locks.h>
27 #include <linux/unistd.h>
28
29 #include <asm/system.h>
30 #include <asm/uaccess.h>
31
32 #include <linux/fs.h>
33 #include <linux/stat.h>
34 #include <asm/uaccess.h>
35 #include <asm/segment.h>
36 #include <linux/miscdevice.h>
37
38 #define DEBUG_SUBSYSTEM S_OSC
39
40 #include <linux/obd_support.h>
41 #include <linux/obd_class.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_idl.h>
44
45 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
46
47 /* FIXME: this belongs in some sort of service struct */
48 static int osc_xid = 1;
49
50 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
51                                  int buflen2, char *buf2)
52 {
53         struct ptlrpc_request *request;
54         int rc;
55         ENTRY; 
56
57         OBD_ALLOC(request, sizeof(*request));
58         if (!request) { 
59                 CERROR("request allocation out of memory\n");
60                 return NULL;
61         }
62
63         memset(request, 0, sizeof(*request));
64         request->rq_xid = osc_xid++;
65
66         rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
67                           &request->rq_reqhdr, &request->rq_req.ost, 
68                           &request->rq_reqlen, &request->rq_reqbuf);
69         if (rc) { 
70                 CERROR("llight request: cannot pack request %d\n", rc); 
71                 return NULL;
72         }
73         request->rq_reqhdr->opc = opcode;
74
75         EXIT;
76         return request;
77 }
78
79 /* XXX: unify with mdc_queue_wait */
80 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
81 {
82         struct obd_device *client = conn->oc_dev;
83         struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
84         int rc;
85         DECLARE_WAITQUEUE(wait, current);
86
87         ENTRY;
88
89         /* set the connection id */
90         req->rq_req.ost->connid = conn->oc_id;
91         init_waitqueue_head(&req->rq_wait_for_rep);
92
93         /* XXX fix the race here (wait_for_event?)*/
94         if (peer == NULL) {
95                 /* Local delivery */
96                 CDEBUG(D_INODE, "\n");
97                 rc = ost_queue_req(client, req); 
98         } else {
99                 /* Remote delivery via portals. */
100                 req->rq_req_portal = OST_REQUEST_PORTAL;
101                 req->rq_reply_portal = OST_REPLY_PORTAL;
102                 rc = ptl_send_rpc(req, peer);
103         }
104         if (rc) { 
105                 CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); 
106                 return -rc;
107         }
108
109         CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
110                &conn->oc_dev->u.osc.osc_tgt->u.ost, 
111                conn->oc_id, req->rq_reqhdr->opc, req);
112
113         /* wait for the reply */
114         CDEBUG(D_INODE, "-- sleeping\n");
115         add_wait_queue(&req->rq_wait_for_rep, &wait);
116         while (req->rq_repbuf == NULL) {
117                 set_current_state(TASK_INTERRUPTIBLE);
118
119                 /* if this process really wants to die, let it go */
120                 if (sigismember(&(current->pending.signal), SIGKILL) ||
121                     sigismember(&(current->pending.signal), SIGINT))
122                         break;
123
124                 schedule();
125         }
126         remove_wait_queue(&req->rq_wait_for_rep, &wait);
127         set_current_state(TASK_RUNNING);
128         CDEBUG(D_INODE, "-- done\n");
129
130         if (req->rq_repbuf == NULL) {
131                 /* We broke out because of a signal */
132                 EXIT;
133                 return -EINTR;
134         }
135
136         rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
137                             &req->rq_rep.ost); 
138         if (rc) {
139                 CERROR("mds_unpack_rep failed: %d\n", rc);
140                 return rc;
141         }
142
143         if ( req->rq_rephdr->status == 0 )
144                 CDEBUG(D_INODE, "buf %p len %d status %d\n", 
145                        req->rq_repbuf, req->rq_replen, 
146                        req->rq_rephdr->status); 
147
148         EXIT;
149         return 0;
150 }
151
152 static void osc_free_req(struct ptlrpc_request *request)
153 {
154         OBD_FREE(request, sizeof(*request));
155 }
156
157 static int osc_connect(struct obd_conn *conn)
158 {
159         struct ptlrpc_request *request;
160         int rc; 
161         ENTRY;
162         
163         request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
164         if (!request) { 
165                 CERROR("cannot pack req!\n"); 
166                 return -ENOMEM;
167         }
168
169         request->rq_replen = 
170                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
171
172         rc = osc_queue_wait(conn, request);
173         if (rc) { 
174                 EXIT;
175                 goto out;
176         }
177       
178         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
179
180         conn->oc_id = request->rq_rep.ost->connid;
181  out:
182         osc_free_req(request);
183         EXIT;
184         return rc;
185 }
186
187 static int osc_disconnect(struct obd_conn *conn)
188 {
189         struct ptlrpc_request *request;
190         int rc; 
191         ENTRY;
192         
193         request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
194         if (!request) { 
195                 CERROR("cannot pack req!\n"); 
196                 return -ENOMEM;
197         }
198
199         request->rq_replen = 
200                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
201
202         rc = osc_queue_wait(conn, request);
203         if (rc) { 
204                 EXIT;
205                 goto out;
206         }
207  out:
208         osc_free_req(request);
209         EXIT;
210         return rc;
211 }
212
213
214 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
215 {
216         struct ptlrpc_request *request;
217         int rc; 
218
219         request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
220         if (!request) { 
221                 CERROR("cannot pack req!\n"); 
222                 return -ENOMEM;
223         }
224         
225         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
226         request->rq_req.ost->oa.o_valid = ~0;
227         request->rq_replen = 
228                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
229         
230         rc = osc_queue_wait(conn, request);
231         if (rc) { 
232                 EXIT;
233                 goto out;
234         }
235
236         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
237         if (oa) { 
238                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
239         }
240
241  out:
242         osc_free_req(request);
243         return 0;
244 }
245
246 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
247 {
248         struct ptlrpc_request *request;
249         int rc; 
250
251         request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
252         if (!request) { 
253                 CERROR("cannot pack req!\n"); 
254                 return -ENOMEM;
255         }
256         
257         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
258         request->rq_replen = 
259                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
260         
261         rc = osc_queue_wait(conn, request);
262         if (rc) { 
263                 EXIT;
264                 goto out;
265         }
266
267  out:
268         osc_free_req(request);
269         return 0;
270 }
271
272 static int osc_create(struct obd_conn *conn, struct obdo *oa)
273 {
274         struct ptlrpc_request *request;
275         int rc; 
276
277         if (!oa) { 
278                 CERROR("oa NULL\n"); 
279         }
280         request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
281         if (!request) { 
282                 CERROR("cannot pack req!\n"); 
283                 return -ENOMEM;
284         }
285         
286         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
287         request->rq_req.ost->oa.o_valid = ~0;
288         request->rq_replen = 
289                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
290         
291         rc = osc_queue_wait(conn, request);
292         if (rc) { 
293                 EXIT;
294                 goto out;
295         }
296         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
297
298  out:
299         osc_free_req(request);
300         return 0;
301 }
302
303 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd_off offset)
304 {
305         struct ptlrpc_request *request;
306         int rc; 
307
308         if (!oa) { 
309                 CERROR("oa NULL\n"); 
310         }
311         request = ost_prep_req(OST_PUNCH, 0, NULL, 0, NULL);
312         if (!request) { 
313                 CERROR("cannot pack req!\n"); 
314                 return -ENOMEM;
315         }
316         
317         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
318         request->rq_req.ost->oa.o_valid = ~0;
319         request->rq_req.ost->oa.o_size = offset;
320         request->rq_req.ost->oa.o_blocks = count;
321         request->rq_replen = 
322                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
323         
324         rc = osc_queue_wait(conn, request);
325         if (rc) { 
326                 EXIT;
327                 goto out;
328         }
329         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
330
331  out:
332         osc_free_req(request);
333         return 0;
334 }
335
336 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
337 {
338         struct ptlrpc_request *request;
339         int rc; 
340
341         if (!oa) { 
342                 CERROR("oa NULL\n"); 
343         }
344         request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
345         if (!request) { 
346                 CERROR("cannot pack req!\n"); 
347                 return -ENOMEM;
348         }
349         
350         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
351         request->rq_req.ost->oa.o_valid = ~0;
352         request->rq_replen = 
353                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
354         
355         rc = osc_queue_wait(conn, request);
356         if (rc) { 
357                 EXIT;
358                 goto out;
359         }
360         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
361
362  out:
363         osc_free_req(request);
364         return 0;
365 }
366
367
368 /* mount the file system (secretly) */
369 static int osc_setup(struct obd_device *obddev, obd_count len,
370                         void *buf)
371                         
372 {
373         struct obd_ioctl_data* data = buf;
374         struct osc_obd *osc = &obddev->u.osc;
375         ENTRY;
376
377         if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
378                 /* This is a local connection */
379                 osc->osc_tgt = &obd_dev[data->ioc_dev];
380
381                 CERROR("OSC: tgt %d ost at %p\n", data->ioc_dev,
382                        &osc->osc_tgt->u.ost);
383                 if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
384                      ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
385                         CERROR("device not attached or not set up (%d)\n", 
386                                data->ioc_dev);
387                         EXIT;
388                         return -EINVAL;
389                 }
390         } else {
391                 int err;
392                 /* This is a remote connection using Portals */
393
394                 /* XXX: this should become something like ioc_inlbuf1 */
395                 err = kportal_uuid_to_peer("ost", &osc->osc_peer);
396                 if (err != 0) {
397                         CERROR("Cannot find 'ost' peer.\n");
398                         EXIT;
399                         return -EINVAL;
400                 }
401         }
402
403         MOD_INC_USE_COUNT;
404         EXIT;
405         return 0;
406
407
408 int osc_sendpage(struct ptlrpc_request *req, struct niobuf *dst,
409                  struct niobuf *src)
410 {
411         if (req->rq_peer.peer_nid == 0) {
412                 /* local sendpage */
413                 memcpy((char *)(unsigned long)dst->addr,
414                        (char *)(unsigned long)src->addr, src->len);
415         } else {
416                 char *buf;
417                 int rc;
418
419                 OBD_ALLOC(buf, src->len);
420                 if (!buf)
421                         return -ENOMEM;
422
423                 memcpy(buf, (char *)(unsigned long)src->addr, src->len);
424
425                 req->rq_bulkbuf = buf;
426                 req->rq_bulklen = src->len;
427                 rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL, 0);
428                 init_waitqueue_head(&req->rq_wait_for_bulk);
429                 sleep_on(&req->rq_wait_for_bulk);
430                 OBD_FREE(buf, src->len);
431                 req->rq_bulklen = 0; /* FIXME: eek. */
432         }
433
434         return 0;
435 }
436
437
438 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
439               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
440               obd_size *count, obd_off *offset, obd_flag *flags)
441 {
442         struct ptlrpc_request *request;
443         int rc; 
444         struct obd_ioobj ioo;
445         struct niobuf src;
446         int size1, size2 = 0; 
447         void *ptr1, *ptr2;
448         int i, j, n;
449
450         size1 = num_oa * sizeof(ioo); 
451         for (i = 0; i < num_oa; i++) { 
452                 size2 += oa_bufs[i] * sizeof(src);
453         }
454
455         request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
456         if (!request) { 
457                 CERROR("cannot pack req!\n"); 
458                 return -ENOMEM;
459         }
460
461         n = 0;
462         request->rq_req.ost->cmd = rw;
463         ptr1 = ost_req_buf1(request->rq_req.ost);
464         ptr2 = ost_req_buf2(request->rq_req.ost);
465         for (i = 0; i < num_oa; i++) {
466                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
467                 for (j = 0; j < oa_bufs[i]; j++) {
468                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
469                                         count[n], flags[n]); 
470                         n++;
471                 }
472         }
473
474         request->rq_bulk_portal = OST_BULK_PORTAL;
475         request->rq_replen = 
476                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
477
478         rc = osc_queue_wait(conn, request);
479         if (rc) { 
480                 EXIT;
481                 goto out;
482         }
483
484 #if 0
485         ptr2 = ost_rep_buf2(request->rq_rep.ost); 
486         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { 
487                 CERROR("buffer length wrong\n"); 
488                 goto out;
489         }
490
491         if (rw == OBD_BRW_READ)
492                 goto out;
493
494         for (i = 0; i < num_oa; i++) {
495                 for (j = 0; j < oa_bufs[i]; j++) {
496                         struct niobuf *dst;
497                         src.addr = (__u64)(unsigned long)buf[n];
498                         src.len = count[n];
499                         ost_unpack_niobuf(&ptr2, &dst);
500                         osc_sendpage(request, dst, &src);
501                         n++;
502                 }
503         }
504 #endif
505
506  out:
507         if (request->rq_rephdr)
508                 OBD_FREE(request->rq_rephdr, request->rq_replen);
509         n = 0;
510         for (i = 0; i < num_oa; i++) {
511                 for (j = 0; j < oa_bufs[i]; j++) {
512                         kunmap(buf[n]);
513                         n++;
514                 }
515         }
516
517         osc_free_req(request);
518         return 0;
519 }
520
521 static int osc_cleanup(struct obd_device * obddev)
522 {
523         MOD_DEC_USE_COUNT;
524         return 0;
525 }
526
527 struct obd_ops osc_obd_ops = { 
528         o_setup:   osc_setup,
529         o_cleanup: osc_cleanup, 
530         o_create: osc_create,
531         o_destroy: osc_destroy,
532         o_getattr: osc_getattr,
533         o_setattr: osc_setattr,
534         o_connect: osc_connect,
535         o_disconnect: osc_disconnect,
536         o_brw: osc_brw,
537         o_punch: osc_punch
538 };
539
540 static int __init osc_init(void)
541 {
542         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
543         return 0;
544 }
545
546 static void __exit osc_exit(void)
547 {
548         obd_unregister_type(LUSTRE_OSC_NAME);
549 }
550
551 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
552 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
553 MODULE_LICENSE("GPL"); 
554
555 module_init(osc_init);
556 module_exit(osc_exit);
557