Whamcloud - gitweb
- Added DEBUG_SUBSYSTEMs
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* 
2  * Copryright (C) 2001 Cluster File Systems, Inc.
3  *
4  *  This code is issued under the GNU General Public License.
5  *  See the file COPYING in this distribution
6  *
7  *  Author Peter Braam <braam@clusterfs.com>
8  * 
9  *  This server is single threaded at present (but can easily be multi
10  *  threaded). For testing and management it is treated as an
11  *  obd_device, although it does not export a full OBD method table
12  *  (the requests are coming in over the wire, so object target
13  *  modules do not have a full method table.)
14  * 
15  */
16
17 #define EXPORT_SYMTAB
18
19 #include <linux/config.h>
20 #include <linux/module.h>
21 #include <linux/kernel.h>
22 #include <linux/mm.h>
23 #include <linux/string.h>
24 #include <linux/stat.h>
25 #include <linux/errno.h>
26 #include <linux/locks.h>
27 #include <linux/unistd.h>
28
29 #include <asm/system.h>
30 #include <asm/uaccess.h>
31
32 #include <linux/fs.h>
33 #include <linux/stat.h>
34 #include <asm/uaccess.h>
35 #include <asm/segment.h>
36 #include <linux/miscdevice.h>
37
38 #define DEBUG_SUBSYSTEM S_OSC
39
40 #include <linux/obd_support.h>
41 #include <linux/obd_class.h>
42 #include <linux/lustre_lib.h>
43 #include <linux/lustre_idl.h>
44
45 extern int ost_queue_req(struct obd_device *, struct ptlrpc_request *);
46
47 /* FIXME: this belongs in some sort of service struct */
48 static int osc_xid = 1;
49
50 struct ptlrpc_request *ost_prep_req(int opcode, int buflen1, char *buf1, 
51                                  int buflen2, char *buf2)
52 {
53         struct ptlrpc_request *request;
54         int rc;
55         ENTRY; 
56
57         OBD_ALLOC(request, sizeof(*request));
58         if (!request) { 
59                 printk("osc_prep_req: request allocation out of memory\n");
60                 return NULL;
61         }
62
63         memset(request, 0, sizeof(*request));
64         request->rq_xid = osc_xid++;
65
66         rc = ost_pack_req(buf1, buflen1,  buf2, buflen2,
67                           &request->rq_reqhdr, &request->rq_req.ost, 
68                           &request->rq_reqlen, &request->rq_reqbuf);
69         if (rc) { 
70                 printk("llight request: cannot pack request %d\n", rc); 
71                 return NULL;
72         }
73         request->rq_reqhdr->opc = opcode;
74
75         EXIT;
76         return request;
77 }
78
79 /* XXX: unify with mdc_queue_wait */
80 extern int osc_queue_wait(struct obd_conn *conn, struct ptlrpc_request *req)
81 {
82         struct obd_device *client = conn->oc_dev;
83         struct lustre_peer *peer = &conn->oc_dev->u.osc.osc_peer;
84         int rc;
85
86         ENTRY;
87
88         /* set the connection id */
89         req->rq_req.ost->connid = conn->oc_id;
90
91         /* XXX fix the race here (wait_for_event?)*/
92         if (peer == NULL) {
93                 /* Local delivery */
94                 CDEBUG(D_INODE, "\n");
95                 rc = ost_queue_req(client, req); 
96         } else {
97                 /* Remote delivery via portals. */
98                 req->rq_req_portal = OST_REQUEST_PORTAL;
99                 req->rq_reply_portal = OST_REPLY_PORTAL;
100                 rc = ptl_send_rpc(req, peer);
101         }
102         if (rc) { 
103                 printk(__FUNCTION__ ": error %d, opcode %d\n", rc, 
104                        req->rq_reqhdr->opc); 
105                 return -rc;
106         }
107
108         CDEBUG(D_INODE, "tgt at %p, conn id %d, opcode %d request at: %p\n", 
109                &conn->oc_dev->u.osc.osc_tgt->u.ost, 
110                conn->oc_id, req->rq_reqhdr->opc, req);
111
112         /* wait for the reply */
113         init_waitqueue_head(&req->rq_wait_for_rep);
114         CDEBUG(D_INODE, "-- sleeping\n");
115         interruptible_sleep_on(&req->rq_wait_for_rep);
116         CDEBUG(D_INODE, "-- done\n");
117
118         rc = ost_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
119                             &req->rq_rep.ost); 
120         if (rc) {
121                 printk(__FUNCTION__ ": mds_unpack_rep failed: %d\n", rc);
122                 return rc;
123         }
124
125         if ( req->rq_rephdr->status == 0 )
126                 CDEBUG(D_INODE, "buf %p len %d status %d\n", 
127                        req->rq_repbuf, req->rq_replen, 
128                        req->rq_rephdr->status); 
129
130         EXIT;
131         return 0;
132 }
133
134 static void osc_free_req(struct ptlrpc_request *request)
135 {
136         OBD_FREE(request, sizeof(*request));
137 }
138
139 static int osc_connect(struct obd_conn *conn)
140 {
141         struct ptlrpc_request *request;
142         int rc; 
143         ENTRY;
144         
145         request = ost_prep_req(OST_CONNECT, 0, NULL, 0, NULL);
146         if (!request) { 
147                 printk(__FUNCTION__ ": cannot pack req!\n"); 
148                 return -ENOMEM;
149         }
150
151         request->rq_replen = 
152                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
153
154         rc = osc_queue_wait(conn, request);
155         if (rc) { 
156                 EXIT;
157                 goto out;
158         }
159       
160         CDEBUG(D_INODE, "received connid %d\n", request->rq_rep.ost->connid); 
161
162         conn->oc_id = request->rq_rep.ost->connid;
163  out:
164         osc_free_req(request);
165         EXIT;
166         return rc;
167 }
168
169 static int osc_disconnect(struct obd_conn *conn)
170 {
171         struct ptlrpc_request *request;
172         int rc; 
173         ENTRY;
174         
175         request = ost_prep_req(OST_DISCONNECT, 0, NULL, 0, NULL);
176         if (!request) { 
177                 printk(__FUNCTION__ ": cannot pack req!\n"); 
178                 return -ENOMEM;
179         }
180
181         request->rq_replen = 
182                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
183
184         rc = osc_queue_wait(conn, request);
185         if (rc) { 
186                 EXIT;
187                 goto out;
188         }
189  out:
190         osc_free_req(request);
191         EXIT;
192         return rc;
193 }
194
195
196 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
197 {
198         struct ptlrpc_request *request;
199         int rc; 
200
201         request = ost_prep_req(OST_GETATTR, 0, NULL, 0, NULL);
202         if (!request) { 
203                 printk(__FUNCTION__ ": cannot pack req!\n"); 
204                 return -ENOMEM;
205         }
206         
207         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
208         request->rq_req.ost->oa.o_valid = ~0;
209         request->rq_replen = 
210                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
211         
212         rc = osc_queue_wait(conn, request);
213         if (rc) { 
214                 EXIT;
215                 goto out;
216         }
217
218         CDEBUG(D_INODE, "mode: %o\n", request->rq_rep.ost->oa.o_mode); 
219         if (oa) { 
220                 memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
221         }
222
223  out:
224         osc_free_req(request);
225         return 0;
226 }
227
228 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
229 {
230         struct ptlrpc_request *request;
231         int rc; 
232
233         request = ost_prep_req(OST_SETATTR, 0, NULL, 0, NULL);
234         if (!request) { 
235                 printk(__FUNCTION__ ": cannot pack req!\n"); 
236                 return -ENOMEM;
237         }
238         
239         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
240         request->rq_replen = 
241                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
242         
243         rc = osc_queue_wait(conn, request);
244         if (rc) { 
245                 EXIT;
246                 goto out;
247         }
248
249  out:
250         osc_free_req(request);
251         return 0;
252 }
253
254 static int osc_create(struct obd_conn *conn, struct obdo *oa)
255 {
256         struct ptlrpc_request *request;
257         int rc; 
258
259         if (!oa) { 
260                 printk(__FUNCTION__ ": oa NULL\n"); 
261         }
262         request = ost_prep_req(OST_CREATE, 0, NULL, 0, NULL);
263         if (!request) { 
264                 printk("osc_connect: cannot pack req!\n"); 
265                 return -ENOMEM;
266         }
267         
268         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
269         request->rq_req.ost->oa.o_valid = ~0;
270         request->rq_replen = 
271                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
272         
273         rc = osc_queue_wait(conn, request);
274         if (rc) { 
275                 EXIT;
276                 goto out;
277         }
278         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
279
280  out:
281         osc_free_req(request);
282         return 0;
283 }
284
285 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
286 {
287         struct ptlrpc_request *request;
288         int rc; 
289
290         if (!oa) { 
291                 printk(__FUNCTION__ ": oa NULL\n"); 
292         }
293         request = ost_prep_req(OST_DESTROY, 0, NULL, 0, NULL);
294         if (!request) { 
295                 printk("osc_connect: cannot pack req!\n"); 
296                 return -ENOMEM;
297         }
298         
299         memcpy(&request->rq_req.ost->oa, oa, sizeof(*oa));
300         request->rq_req.ost->oa.o_valid = ~0;
301         request->rq_replen = 
302                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep);
303         
304         rc = osc_queue_wait(conn, request);
305         if (rc) { 
306                 EXIT;
307                 goto out;
308         }
309         memcpy(oa, &request->rq_rep.ost->oa, sizeof(*oa));
310
311  out:
312         osc_free_req(request);
313         return 0;
314 }
315
316
317 /* mount the file system (secretly) */
318 static int osc_setup(struct obd_device *obddev, obd_count len,
319                         void *buf)
320                         
321 {
322         struct obd_ioctl_data* data = buf;
323         struct osc_obd *osc = &obddev->u.osc;
324         ENTRY;
325
326         if (data->ioc_dev >= 0 && data->ioc_dev < MAX_OBD_DEVICES) {
327                 /* This is a local connection */
328                 osc->osc_tgt = &obd_dev[data->ioc_dev];
329
330                 printk("OSC: tgt %d ost at %p\n", data->ioc_dev,
331                        &osc->osc_tgt->u.ost);
332                 if ( ! (osc->osc_tgt->obd_flags & OBD_ATTACHED) || 
333                      ! (osc->osc_tgt->obd_flags & OBD_SET_UP) ){
334                         printk("device not attached or not set up (%d)\n", 
335                                data->ioc_dev);
336                         EXIT;
337                         return -EINVAL;
338                 }
339         } else {
340                 int err;
341                 /* This is a remote connection using Portals */
342
343                 /* XXX: this should become something like ioc_inlbuf1 */
344                 err = kportal_uuid_to_peer("ost", &osc->osc_peer);
345                 if (err != 0) {
346                         printk("Cannot find 'ost' peer.\n");
347                         EXIT;
348                         return -EINVAL;
349                 }
350         }
351
352         MOD_INC_USE_COUNT;
353         EXIT;
354         return 0;
355
356
357 void osc_sendpage(struct niobuf *dst, struct niobuf *src)
358 {
359         memcpy((char *)(unsigned long)dst->addr,  
360                (char *)(unsigned long)src->addr, 
361                src->len);
362         return;
363 }
364
365
366 int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa,
367               struct obdo **oa, obd_count *oa_bufs, struct page **buf,
368               obd_size *count, obd_off *offset, obd_flag *flags)
369 {
370         struct ptlrpc_request *request;
371         int rc; 
372         struct obd_ioobj ioo;
373         struct niobuf src;
374         int size1, size2 = 0; 
375         void *ptr1, *ptr2;
376         int i, j, n;
377
378         size1 = num_oa * sizeof(ioo); 
379         for (i = 0; i < num_oa; i++) { 
380                 size2 += oa_bufs[i] * sizeof(src);
381         }
382
383         request = ost_prep_req(OST_BRW, size1, NULL, size2, NULL);
384         if (!request) { 
385                 printk("osc_connect: cannot pack req!\n"); 
386                 return -ENOMEM;
387         }
388
389         n = 0;
390         request->rq_req.ost->cmd = rw;
391         ptr1 = ost_req_buf1(request->rq_req.ost);
392         ptr2 = ost_req_buf2(request->rq_req.ost);
393         for (i=0; i < num_oa; i++) { 
394                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); 
395                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
396                         ost_pack_niobuf(&ptr2, kmap(buf[n]), offset[n],
397                                         count[n], flags[n]); 
398                         n++;
399                 }
400         }
401
402         request->rq_replen = 
403                 sizeof(struct ptlrep_hdr) + sizeof(struct ost_rep) + size2;
404
405         rc = osc_queue_wait(conn, request);
406         if (rc) { 
407                 EXIT;
408                 goto out;
409         }
410
411 #if 0
412         ptr2 = ost_rep_buf2(request->rq_rep.ost); 
413         if (request->rq_rep.ost->buflen2 != n * sizeof(struct niobuf)) { 
414                 printk(__FUNCTION__ ": buffer length wrong\n"); 
415                 goto out;
416         }
417
418         if (rw == OBD_BRW_READ)
419                 goto out;
420
421         for (i=0; i < num_oa; i++) { 
422                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
423                         struct niobuf *dst;
424                         src.addr = (__u64)(unsigned long)buf[n];
425                         src.len = count[n];
426                         ost_unpack_niobuf(&ptr2, &dst);
427                         osc_sendpage(dst, &src);
428                         n++;
429                 }
430         }
431 #endif
432
433  out:
434         if (request->rq_rephdr)
435                 OBD_FREE(request->rq_rephdr, request->rq_replen);
436         n = 0;
437         for (i=0; i < num_oa; i++) { 
438                 for (j = 0 ; j < oa_bufs[i] ; j++) { 
439                         kunmap(buf[n]);
440                         n++;
441                 }
442         }
443
444         osc_free_req(request);
445         return 0;
446 }
447
448 static int osc_cleanup(struct obd_device * obddev)
449 {
450         MOD_DEC_USE_COUNT;
451         return 0;
452 }
453
454 struct obd_ops osc_obd_ops = { 
455         o_setup:   osc_setup,
456         o_cleanup: osc_cleanup, 
457         o_create: osc_create,
458         o_destroy: osc_destroy,
459         o_getattr: osc_getattr,
460         o_setattr: osc_setattr,
461         o_connect: osc_connect,
462         o_disconnect: osc_disconnect,
463         o_brw: osc_brw
464 };
465
466 static int __init osc_init(void)
467 {
468         obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
469         return 0;
470 }
471
472 static void __exit osc_exit(void)
473 {
474         obd_unregister_type(LUSTRE_OSC_NAME);
475 }
476
477 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
478 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
479 MODULE_LICENSE("GPL"); 
480
481 module_init(osc_init);
482 module_exit(osc_exit);
483