Whamcloud - gitweb
- add the truncate obd call
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* 
2  * Copryright (C) 2001 Cluster File Systems, Inc.
3  *
4  *  This code is issued under the GNU General Public License.
5  *  See the file COPYING in this distribution
6  *
7  *  Author Peter Braam <braam@clusterfs.com>
8  * 
9  *  This server is single threaded at present (but can easily be multi
10  *  threaded). For testing and management it is treated as an
11  *  obd_device, although it does not export a full OBD method table
12  *  (the requests are coming in over the wire, so object target
13  *  modules do not have a full method table.)
14  * 
15  */
16
17 #define EXPORT_SYMTAB
18
19 #include <linux/config.h>
20 #include <linux/module.h>
21 #include <linux/kernel.h>
22 #include <linux/mm.h>
23 #include <linux/string.h>
24 #include <linux/stat.h>
25 #include <linux/errno.h>
26 #include <linux/locks.h>
27 #include <linux/unistd.h>
28
29 #include <asm/system.h>
30 #include <asm/uaccess.h>
31
32 #include <linux/fs.h>
33 #include <linux/stat.h>
34 #include <asm/uaccess.h>
35 #include <asm/segment.h>
36 #include <linux/miscdevice.h>
37
38 #define DEBUG_SUBSYSTEM S_OSC
39
40 #include <linux/obd_support.h>
41 #include <linux/obd_class.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_idl.h>
44
45 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
46
47 /* FIXME: this belongs in some sort of service struct */
48 static int osc_xid = 1;
49
50 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
51                                  int buflen2, char *buf2)
52 {
53         struct ptlrpc_request *request;
54         int rc;
55         ENTRY; 
56
57         OBD_ALLOC(request, sizeof(*request));
58         if (!request) { 
59                 CERROR("request allocation out of memory\n");
60                 return NULL;
61         }
62
63         memset(request, 0, sizeof(*request));
64         request->rq_xid = osc_xid++;
65
66         rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
67                           &request->rq_reqhdr, &request->rq_req.ost, 
68                           &request->rq_reqlen, &request->rq_reqbuf);
69         if (rc) { 
70                 CERROR("llight request: cannot pack request %d\n", rc); 
71                 return NULL;
72         }
73         request->rq_reqhdr->opc = opcode;
74
75         EXIT;
76         return request;
77 }
78
79 /* XXX: unify with mdc_queue_wait */
80 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
81 {
82         struct obd_device *client = conn->oc_dev;
83         struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
84         int rc;
85
86         ENTRY;
87
88         /* set the connection id */
89         req->rq_req.ost->connid = conn->oc_id;
90
91         /* XXX fix the race here (wait_for_event?)*/
92         if (peer == NULL) {
93                 /* Local delivery */
94                 CDEBUG(D_INODE, "\n");
95                 rc = ost_queue_req(client, req); 
96         } else {
97                 /* Remote delivery via portals. */
98                 req->rq_req_portal = OST_REQUEST_PORTAL;
99                 req->rq_reply_portal = OST_REPLY_PORTAL;
100                 rc = ptl_send_rpc(req, peer);
101         }
102         if (rc) { 
103                 CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); 
104                 return -rc;
105         }
106
107         CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
108                &conn->oc_dev->u.osc.osc_tgt->u.ost, 
109                conn->oc_id, req->rq_reqhdr->opc, req);
110
111         /* wait for the reply */
112         init_waitqueue_head(&req->rq_wait_for_rep);
113         CDEBUG(D_INODE, "-- sleeping\n");
114         interruptible_sleep_on(&req->rq_wait_for_rep);
115         CDEBUG(D_INODE, "-- done\n");
116
117         rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
118                             &req->rq_rep.ost); 
119         if (rc) {
120                 CERROR("mds_unpack_rep failed: %d\n", rc);
121                 return rc;
122         }
123
124         if ( req->rq_rephdr->status == 0 )
125                 CDEBUG(D_INODE, "buf %p len %d status %d\n", 
126                        req->rq_repbuf, req->rq_replen, 
127                        req->rq_rephdr->status); 
128
129         EXIT;
130         return 0;
131 }
132
133 static void osc_free_req(struct ptlrpc_request *request)
134 {
135         OBD_FREE(request, sizeof(*request));
136 }
137
138 static int osc_connect(struct obd_conn *conn)
139 {
140         struct ptlrpc_request *request;
141         int rc; 
142         ENTRY;
143         
144         request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
145         if (!request) { 
146                 CERROR("cannot pack req!\n"); 
147                 return -ENOMEM;
148         }
149
150         request->rq_replen = 
151                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
152
153         rc = osc_queue_wait(conn, request);
154         if (rc) { 
155                 EXIT;
156                 goto out;
157         }
158       
159         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
160
161         conn->oc_id = request->rq_rep.ost->connid;
162  out:
163         osc_free_req(request);
164         EXIT;
165         return rc;
166 }
167
168 static int osc_disconnect(struct obd_conn *conn)
169 {
170         struct ptlrpc_request *request;
171         int rc; 
172         ENTRY;
173         
174         request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
175         if (!request) { 
176                 CERROR("cannot pack req!\n"); 
177                 return -ENOMEM;
178         }
179
180         request->rq_replen = 
181                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
182
183         rc = osc_queue_wait(conn, request);
184         if (rc) { 
185                 EXIT;
186                 goto out;
187         }
188  out:
189         osc_free_req(request);
190         EXIT;
191         return rc;
192 }
193
194
195 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
196 {
197         struct ptlrpc_request *request;
198         int rc; 
199
200         request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
201         if (!request) { 
202                 CERROR("cannot pack req!\n"); 
203                 return -ENOMEM;
204         }
205         
206         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
207         request->rq_req.ost->oa.o_valid = ~0;
208         request->rq_replen = 
209                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
210         
211         rc = osc_queue_wait(conn, request);
212         if (rc) { 
213                 EXIT;
214                 goto out;
215         }
216
217         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
218         if (oa) { 
219                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
220         }
221
222  out:
223         osc_free_req(request);
224         return 0;
225 }
226
227 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
228 {
229         struct ptlrpc_request *request;
230         int rc; 
231
232         request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
233         if (!request) { 
234                 CERROR("cannot pack req!\n"); 
235                 return -ENOMEM;
236         }
237         
238         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
239         request->rq_replen = 
240                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
241         
242         rc = osc_queue_wait(conn, request);
243         if (rc) { 
244                 EXIT;
245                 goto out;
246         }
247
248  out:
249         osc_free_req(request);
250         return 0;
251 }
252
253 static int osc_create(struct obd_conn *conn, struct obdo *oa)
254 {
255         struct ptlrpc_request *request;
256         int rc; 
257
258         if (!oa) { 
259                 CERROR("oa NULL\n"); 
260         }
261         request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
262         if (!request) { 
263                 CERROR("cannot pack req!\n"); 
264                 return -ENOMEM;
265         }
266         
267         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
268         request->rq_req.ost->oa.o_valid = ~0;
269         request->rq_replen = 
270                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
271         
272         rc = osc_queue_wait(conn, request);
273         if (rc) { 
274                 EXIT;
275                 goto out;
276         }
277         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
278
279  out:
280         osc_free_req(request);
281         return 0;
282 }
283
284 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd_off offset)
285 {
286         struct ptlrpc_request *request;
287         int rc; 
288
289         if (!oa) { 
290                 CERROR("oa NULL\n"); 
291         }
292         request = ost_prep_req(OST_PUNCH, 0, NULL, 0, NULL);
293         if (!request) { 
294                 CERROR("cannot pack req!\n"); 
295                 return -ENOMEM;
296         }
297         
298         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
299         request->rq_req.ost->oa.o_valid = ~0;
300         request->rq_req.ost->oa.o_size = offset;
301         request->rq_req.ost->oa.o_blocks = count;
302         request->rq_replen = 
303                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
304         
305         rc = osc_queue_wait(conn, request);
306         if (rc) { 
307                 EXIT;
308                 goto out;
309         }
310         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
311
312  out:
313         osc_free_req(request);
314         return 0;
315 }
316
317 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
318 {
319         struct ptlrpc_request *request;
320         int rc; 
321
322         if (!oa) { 
323                 CERROR("oa NULL\n"); 
324         }
325         request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
326         if (!request) { 
327                 CERROR("cannot pack req!\n"); 
328                 return -ENOMEM;
329         }
330         
331         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
332         request->rq_req.ost->oa.o_valid = ~0;
333         request->rq_replen = 
334                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
335         
336         rc = osc_queue_wait(conn, request);
337         if (rc) { 
338                 EXIT;
339                 goto out;
340         }
341         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
342
343  out:
344         osc_free_req(request);
345         return 0;
346 }
347
348
349 /* mount the file system (secretly) */
350 static int osc_setup(struct obd_device *obddev, obd_count len,
351                         void *buf)
352                         
353 {
354         struct obd_ioctl_data* data = buf;
355         struct osc_obd *osc = &obddev->u.osc;
356         ENTRY;
357
358         if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
359                 /* This is a local connection */
360                 osc->osc_tgt = &obd_dev[data->ioc_dev];
361
362                 CERROR("OSC: tgt %d ost at %p\n", data->ioc_dev,
363                        &osc->osc_tgt->u.ost);
364                 if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
365                      ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
366                         CERROR("device not attached or not set up (%d)\n", 
367                                data->ioc_dev);
368                         EXIT;
369                         return -EINVAL;
370                 }
371         } else {
372                 int err;
373                 /* This is a remote connection using Portals */
374
375                 /* XXX: this should become something like ioc_inlbuf1 */
376                 err = kportal_uuid_to_peer("ost", &osc->osc_peer);
377                 if (err != 0) {
378                         CERROR("Cannot find 'ost' peer.\n");
379                         EXIT;
380                         return -EINVAL;
381                 }
382         }
383
384         MOD_INC_USE_COUNT;
385         EXIT;
386         return 0;
387
388
389 void osc_sendpage(struct niobuf *dst, struct niobuf *src)
390 {
391         memcpy((char *)(unsigned long)dst->addr,  
392                (char *)(unsigned long)src->addr, 
393                src->len);
394         return;
395 }
396
397
398 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
399               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
400               obd_size *count, obd_off *offset, obd_flag *flags)
401 {
402         struct ptlrpc_request *request;
403         int rc; 
404         struct obd_ioobj ioo;
405         struct niobuf src;
406         int size1, size2 = 0; 
407         void *ptr1, *ptr2;
408         int i, j, n;
409
410         size1 = num_oa * sizeof(ioo); 
411         for (i = 0; i < num_oa; i++) { 
412                 size2 += oa_bufs[i] * sizeof(src);
413         }
414
415         request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
416         if (!request) { 
417                 CERROR("cannot pack req!\n"); 
418                 return -ENOMEM;
419         }
420
421         n = 0;
422         request->rq_req.ost->cmd = rw;
423         ptr1 = ost_req_buf1(request->rq_req.ost);
424         ptr2 = ost_req_buf2(request->rq_req.ost);
425         for (i=0; i < num_oa; i++) { 
426                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
427                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
428                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
429                                         count[n], flags[n]); 
430                         n++;
431                 }
432         }
433
434         request->rq_replen = 
435                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
436
437         rc = osc_queue_wait(conn, request);
438         if (rc) { 
439                 EXIT;
440                 goto out;
441         }
442
443 #if 0
444         ptr2 = ost_rep_buf2(request->rq_rep.ost); 
445         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { 
446                 CERROR("buffer length wrong\n"); 
447                 goto out;
448         }
449
450         if (rw == OBD_BRW_READ)
451                 goto out;
452
453         for (i=0; i < num_oa; i++) { 
454                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
455                         struct niobuf *dst;
456                         src.addr = (__u64)(unsigned long)buf[n];
457                         src.len = count[n];
458                         ost_unpack_niobuf(&ptr2, &dst);
459                         osc_sendpage(dst, &src);
460                         n++;
461                 }
462         }
463 #endif
464
465  out:
466         if (request->rq_rephdr)
467                 OBD_FREE(request->rq_rephdr, request->rq_replen);
468         n = 0;
469         for (i=0; i < num_oa; i++) { 
470                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
471                         kunmap(buf[n]);
472                         n++;
473                 }
474         }
475
476         osc_free_req(request);
477         return 0;
478 }
479
480 static int osc_cleanup(struct obd_device * obddev)
481 {
482         MOD_DEC_USE_COUNT;
483         return 0;
484 }
485
486 struct obd_ops osc_obd_ops = { 
487         o_setup:   osc_setup,
488         o_cleanup: osc_cleanup, 
489         o_create: osc_create,
490         o_destroy: osc_destroy,
491         o_getattr: osc_getattr,
492         o_setattr: osc_setattr,
493         o_connect: osc_connect,
494         o_disconnect: osc_disconnect,
495         o_brw: osc_brw,
496         o_punch: osc_punch
497 };
498
499 static int __init osc_init(void)
500 {
501         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
502         return 0;
503 }
504
505 static void __exit osc_exit(void)
506 {
507         obd_unregister_type(LUSTRE_OSC_NAME);
508 }
509
510 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
511 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
512 MODULE_LICENSE("GPL"); 
513
514 module_init(osc_init);
515 module_exit(osc_exit);
516