Whamcloud - gitweb
Prevent C-c and C-z from locking us up, and make most of our waits
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37                           struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40         *cl = osc->osc_ldlm_client;
41         *connection = osc->osc_conn;
42 }
43
44 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
45 {
46         struct osc_obd *osc = &obd->u.osc;
47         struct obd_import *import;
48         struct ptlrpc_request *request;
49         char *tmp = osc->osc_target_uuid;
50         int rc, size = sizeof(osc->osc_target_uuid);
51         ENTRY;
52
53         OBD_ALLOC(import, sizeof(*import)); 
54         if (!import)
55                 RETURN(-ENOMEM);
56                   
57         MOD_INC_USE_COUNT;
58         rc = class_connect(conn, obd);
59         if (rc)
60                 RETURN(rc); 
61
62         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
63                                   OST_CONNECT, 1, &size, &tmp);
64         if (!request)
65                 GOTO(out_disco, rc = -ENOMEM);
66
67         request->rq_replen = lustre_msg_size(0, NULL);
68
69         rc = ptlrpc_queue_wait(request);
70         rc = ptlrpc_check_status(request, rc);
71         if (rc) {
72                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
73                 GOTO(out, rc);
74         }
75
76         /* XXX: Make this a handle */
77         osc->osc_connh.addr = request->rq_repmsg->addr;
78         osc->osc_connh.cookie = request->rq_repmsg->cookie;
79
80         EXIT;
81  out:
82         ptlrpc_free_req(request);
83  out_disco:
84         if (rc) {
85                 class_disconnect(conn);
86                 MOD_DEC_USE_COUNT;
87         }
88         return rc;
89 }
90
91 static int osc_disconnect(struct lustre_handle *conn)
92 {
93         struct ptlrpc_request *request;
94         struct ptlrpc_client *cl;
95         struct ptlrpc_connection *connection;
96         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
97         int rc;
98         ENTRY;
99
100         osc_con2cl(conn, &cl, &connection);
101         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
102                                   OST_DISCONNECT, 0, NULL, NULL);
103         if (!request)
104                 RETURN(-ENOMEM);
105         request->rq_replen = lustre_msg_size(0, NULL);
106
107         rc = ptlrpc_queue_wait(request);
108         if (rc) 
109                 GOTO(out, rc);
110         rc = class_disconnect(conn);
111         if (!rc)
112                 MOD_DEC_USE_COUNT;
113
114  out:
115         ptlrpc_free_req(request);
116         return rc;
117 }
118
119 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
120 {
121         struct ptlrpc_request *request;
122         struct ptlrpc_client *cl;
123         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
124         struct ptlrpc_connection *connection;
125         struct ost_body *body;
126         int rc, size = sizeof(*body);
127         ENTRY;
128
129         osc_con2cl(conn, &cl, &connection);
130         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
131                                    OST_GETATTR, 1, &size, NULL);
132         if (!request)
133                 RETURN(-ENOMEM);
134
135         body = lustre_msg_buf(request->rq_reqmsg, 0);
136         memcpy(&body->oa, oa, sizeof(*oa));
137         body->oa.o_valid = ~0;
138
139         request->rq_replen = lustre_msg_size(1, &size);
140
141         rc = ptlrpc_queue_wait(request);
142         rc = ptlrpc_check_status(request, rc);
143         if (rc) {
144                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
145                 GOTO(out, rc);
146         }
147
148         body = lustre_msg_buf(request->rq_repmsg, 0);
149         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
150         if (oa)
151                 memcpy(oa, &body->oa, sizeof(*oa));
152
153         EXIT;
154  out:
155         ptlrpc_free_req(request);
156         return rc;
157 }
158
159 static int osc_open(struct lustre_handle *conn, struct obdo *oa)
160 {
161         struct ptlrpc_request *request;
162         struct ptlrpc_client *cl;
163         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
164         struct ptlrpc_connection *connection;
165         struct ost_body *body;
166         int rc, size = sizeof(*body);
167         ENTRY;
168
169         osc_con2cl(conn, &cl, &connection);
170         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
171                                    OST_OPEN, 1, &size, NULL);
172         if (!request)
173                 RETURN(-ENOMEM);
174
175         body = lustre_msg_buf(request->rq_reqmsg, 0);
176         memcpy(&body->oa, oa, sizeof(*oa));
177         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
178
179         request->rq_replen = lustre_msg_size(1, &size);
180
181         rc = ptlrpc_queue_wait(request);
182         rc = ptlrpc_check_status(request, rc);
183         if (rc)
184                 GOTO(out, rc);
185
186         body = lustre_msg_buf(request->rq_repmsg, 0);
187         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
188         if (oa)
189                 memcpy(oa, &body->oa, sizeof(*oa));
190
191         EXIT;
192  out:
193         ptlrpc_free_req(request);
194         return rc;
195 }
196
197 static int osc_close(struct lustre_handle *conn, struct obdo *oa)
198 {
199         struct ptlrpc_request *request;
200         struct ptlrpc_client *cl;
201         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
202         struct ptlrpc_connection *connection;
203         struct ost_body *body;
204         int rc, size = sizeof(*body);
205         ENTRY;
206
207         osc_con2cl(conn, &cl, &connection);
208         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
209                                    OST_CLOSE, 1, &size, NULL);
210         if (!request)
211                 RETURN(-ENOMEM);
212
213         body = lustre_msg_buf(request->rq_reqmsg, 0);
214         memcpy(&body->oa, oa, sizeof(*oa));
215
216         request->rq_replen = lustre_msg_size(1, &size);
217
218         rc = ptlrpc_queue_wait(request);
219         rc = ptlrpc_check_status(request, rc);
220         if (rc)
221                 GOTO(out, rc);
222
223         body = lustre_msg_buf(request->rq_repmsg, 0);
224         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
225         if (oa)
226                 memcpy(oa, &body->oa, sizeof(*oa));
227
228         EXIT;
229  out:
230         ptlrpc_free_req(request);
231         return rc;
232 }
233
234 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
235 {
236         struct ptlrpc_request *request;
237         struct ptlrpc_client *cl;
238         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
239         struct ptlrpc_connection *connection;
240         struct ost_body *body;
241         int rc, size = sizeof(*body);
242         ENTRY;
243
244         osc_con2cl(conn, &cl, &connection);
245         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
246                                   OST_SETATTR, 1, &size, NULL);
247         if (!request)
248                 RETURN(-ENOMEM);
249
250         body = lustre_msg_buf(request->rq_reqmsg, 0);
251         memcpy(&body->oa, oa, sizeof(*oa));
252
253         request->rq_replen = lustre_msg_size(1, &size);
254
255         rc = ptlrpc_queue_wait(request);
256         rc = ptlrpc_check_status(request, rc);
257         GOTO(out, rc);
258
259  out:
260         ptlrpc_free_req(request);
261         return rc;
262 }
263
264 static int osc_create(struct lustre_handle *conn, struct obdo *oa)
265 {
266         struct ptlrpc_request *request;
267         struct ptlrpc_client *cl;
268         struct ptlrpc_connection *connection;
269         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
270         struct ost_body *body;
271         struct mds_objid *objid;
272         struct lov_object_id *lov_id;
273         int rc, size = sizeof(*body);
274         ENTRY;
275
276         if (!oa) {
277                 CERROR("oa NULL\n");
278                 RETURN(-EINVAL);
279         }
280         osc_con2cl(conn, &cl, &connection);
281         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
282                                   OST_CREATE, 1, &size, NULL);
283         if (!request)
284                 RETURN(-ENOMEM);
285
286         body = lustre_msg_buf(request->rq_reqmsg, 0);
287         memcpy(&body->oa, oa, sizeof(*oa));
288
289         request->rq_replen = lustre_msg_size(1, &size);
290
291         rc = ptlrpc_queue_wait(request);
292         rc = ptlrpc_check_status(request, rc);
293         if (rc)
294                 GOTO(out, rc);
295
296         body = lustre_msg_buf(request->rq_repmsg, 0);
297         memcpy(oa, &body->oa, sizeof(*oa));
298
299         memset(oa->o_inline, 0, sizeof(oa->o_inline));
300         objid = (struct mds_objid *)oa->o_inline;
301         objid->mo_lov_md.lmd_object_id = oa->o_id;
302         objid->mo_lov_md.lmd_stripe_count = 1;
303         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
304         lov_id->l_device_id = 0;
305         lov_id->l_object_id = oa->o_id;
306
307         EXIT;
308  out:
309         ptlrpc_free_req(request);
310         return rc;
311 }
312
313 static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count,
314                      obd_off offset)
315 {
316         struct ptlrpc_request *request;
317         struct ptlrpc_client *cl;
318         struct ptlrpc_connection *connection;
319         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
320         struct ost_body *body;
321         int rc, size = sizeof(*body);
322         ENTRY;
323
324         if (!oa) {
325                 CERROR("oa NULL\n");
326                 RETURN(-EINVAL);
327         }
328         osc_con2cl(conn, &cl, &connection);
329         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
330                                    OST_PUNCH, 1, &size, NULL);
331         if (!request)
332                 RETURN(-ENOMEM);
333
334         body = lustre_msg_buf(request->rq_reqmsg, 0);
335         memcpy(&body->oa, oa, sizeof(*oa));
336         body->oa.o_blocks = count;
337         body->oa.o_valid |= OBD_MD_FLBLOCKS;
338
339         request->rq_replen = lustre_msg_size(1, &size);
340
341         rc = ptlrpc_queue_wait(request);
342         rc = ptlrpc_check_status(request, rc);
343         if (rc)
344                 GOTO(out, rc);
345
346         body = lustre_msg_buf(request->rq_repmsg, 0);
347         memcpy(oa, &body->oa, sizeof(*oa));
348
349         EXIT;
350  out:
351         ptlrpc_free_req(request);
352         return rc;
353 }
354
355 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
356 {
357         struct ptlrpc_request *request;
358         struct ptlrpc_client *cl;
359         struct ptlrpc_connection *connection;
360         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
361         struct ost_body *body;
362         int rc, size = sizeof(*body);
363         ENTRY;
364
365         if (!oa) {
366                 CERROR("oa NULL\n");
367                 RETURN(-EINVAL);
368         }
369         osc_con2cl(conn, &cl, &connection);
370         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
371                                    OST_DESTROY, 1, &size, NULL);
372         if (!request)
373                 RETURN(-ENOMEM);
374
375         body = lustre_msg_buf(request->rq_reqmsg, 0);
376         memcpy(&body->oa, oa, sizeof(*oa));
377         body->oa.o_valid = ~0;
378
379         request->rq_replen = lustre_msg_size(1, &size);
380
381         rc = ptlrpc_queue_wait(request);
382         rc = ptlrpc_check_status(request, rc);
383         if (rc)
384                 GOTO(out, rc);
385
386         body = lustre_msg_buf(request->rq_repmsg, 0);
387         memcpy(oa, &body->oa, sizeof(*oa));
388
389         EXIT;
390  out:
391         ptlrpc_free_req(request);
392         return rc;
393 }
394
395 struct osc_brw_cb_data {
396         struct page **buf;
397         struct ptlrpc_request *req;
398         bulk_callback_t callback;
399         void *cb_data;
400 };
401
402 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
403 {
404         struct osc_brw_cb_data *cb_data = data;
405
406         if (desc->b_flags & PTL_RPC_FL_INTR)
407                 CERROR("got signal\n");
408
409         (cb_data->callback)(desc, cb_data->cb_data);
410
411         ptlrpc_free_bulk(desc);
412         ptlrpc_free_req(cb_data->req);
413
414         OBD_FREE(cb_data, sizeof(*cb_data));
415 }
416
417 static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
418                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
419                         obd_size *count, obd_off *offset, obd_flag *flags,
420                         bulk_callback_t callback)
421 {
422         struct ptlrpc_client *cl;
423         struct ptlrpc_connection *connection;
424         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
425         struct ptlrpc_request *request;
426         struct ost_body *body;
427         struct list_head *tmp;
428         int pages, rc, i, j, size[3] = {sizeof(*body)};
429         void *ptr1, *ptr2;
430         struct ptlrpc_bulk_desc *desc;
431         ENTRY;
432
433         size[1] = num_oa * sizeof(struct obd_ioobj);
434         pages = 0;
435         for (i = 0; i < num_oa; i++)
436                 pages += oa_bufs[i];
437         size[2] = pages * sizeof(struct niobuf_remote);
438
439         osc_con2cl(conn, &cl, &connection);
440         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
441                                   OST_BRW, 3, size, NULL);
442         if (!request)
443                 GOTO(out, rc = -ENOMEM);
444
445         body = lustre_msg_buf(request->rq_reqmsg, 0);
446         body->data = OBD_BRW_READ;
447
448         desc = ptlrpc_prep_bulk(connection);
449         if (!desc)
450                 GOTO(out2, rc = -ENOMEM);
451         desc->b_portal = OST_BULK_PORTAL;
452
453         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
454         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
455         for (pages = 0, i = 0; i < num_oa; i++) {
456                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
457                 /* FIXME: this inner loop is wrong for multiple OAs */
458                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
459                         struct ptlrpc_bulk_page *bulk;
460                         bulk = ptlrpc_prep_bulk_page(desc);
461                         if (bulk == NULL)
462                                 GOTO(out3, rc = -ENOMEM);
463
464                         spin_lock(&connection->c_lock);
465                         bulk->b_xid = ++connection->c_xid_out;
466                         spin_unlock(&connection->c_lock);
467
468                         bulk->b_buf = kmap(buf[pages]);
469                         bulk->b_page = buf[pages];
470                         bulk->b_buflen = PAGE_SIZE;
471                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
472                                         flags[pages], bulk->b_xid);
473                 }
474         }
475
476         rc = ptlrpc_register_bulk(desc);
477         if (rc)
478                 GOTO(out3, rc);
479
480         request->rq_replen = lustre_msg_size(1, size);
481         rc = ptlrpc_queue_wait(request);
482         rc = ptlrpc_check_status(request, rc);
483         if (rc)
484                 ptlrpc_abort_bulk(desc);
485         GOTO(out3, rc);
486
487  out3:
488         list_for_each(tmp, &desc->b_page_list) {
489                 struct ptlrpc_bulk_page *bulk;
490                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
491                 if (bulk->b_page != NULL)
492                         kunmap(bulk->b_page);
493         }
494         ptlrpc_free_bulk(desc);
495  out2:
496         ptlrpc_free_req(request);
497  out:
498         return rc;
499 }
500
501 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
502 {
503         struct osc_brw_cb_data *cb_data = data;
504         int i;
505         ENTRY;
506
507         if (desc->b_flags & PTL_RPC_FL_INTR)
508                 CERROR("got signal\n");
509
510         for (i = 0; i < desc->b_page_count; i++)
511                 kunmap(cb_data->buf[i]);
512
513         (cb_data->callback)(desc, cb_data->cb_data);
514
515         ptlrpc_free_bulk(desc);
516         ptlrpc_free_req(cb_data->req);
517
518         OBD_FREE(cb_data, sizeof(*cb_data));
519         EXIT;
520 }
521
522 static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
523                          struct obdo **oa, obd_count *oa_bufs,
524                          struct page **pagearray, obd_size *count,
525                          obd_off *offset, obd_flag *flags,
526                          bulk_callback_t callback)
527 {
528         struct ptlrpc_client *cl;
529         struct ptlrpc_connection *connection;
530         struct ptlrpc_request *request;
531         struct ptlrpc_bulk_desc *desc;
532         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
533         struct obd_ioobj ioo;
534         struct ost_body *body;
535         struct niobuf_local *local;
536         struct niobuf_remote *remote;
537         struct osc_brw_cb_data *cb_data;
538         long pages;
539         int rc, i, j, size[3] = {sizeof(*body)};
540         void *ptr1, *ptr2;
541         ENTRY;
542
543         size[1] = num_oa * sizeof(ioo);
544         pages = 0;
545         for (i = 0; i < num_oa; i++)
546                 pages += oa_bufs[i];
547         size[2] = pages * sizeof(*remote);
548
549         OBD_ALLOC(local, pages * sizeof(*local));
550         if (local == NULL)
551                 RETURN(-ENOMEM);
552
553         osc_con2cl(conn, &cl, &connection);
554         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
555                                   OST_BRW, 3, size, NULL);
556         if (!request)
557                 GOTO(out, rc = -ENOMEM);
558         body = lustre_msg_buf(request->rq_reqmsg, 0);
559         body->data = OBD_BRW_WRITE;
560
561         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
562         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
563         for (pages = 0, i = 0; i < num_oa; i++) {
564                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
565                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
566                         local[pages].addr = kmap(pagearray[pages]);
567                         local[pages].offset = offset[pages];
568                         local[pages].len = count[pages];
569                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
570                                         flags[pages], 0);
571                 }
572         }
573
574         size[1] = pages * sizeof(struct niobuf_remote);
575         request->rq_replen = lustre_msg_size(2, size);
576
577         rc = ptlrpc_queue_wait(request);
578         rc = ptlrpc_check_status(request, rc);
579         if (rc)
580                 GOTO(out2, rc);
581
582         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
583         if (ptr2 == NULL)
584                 GOTO(out2, rc = -EINVAL);
585
586         if (request->rq_repmsg->buflens[1] !=
587             pages * sizeof(struct niobuf_remote)) {
588                 CERROR("buffer length wrong (%d vs. %ld)\n",
589                        request->rq_repmsg->buflens[1],
590                        pages * sizeof(struct niobuf_remote));
591                 GOTO(out2, rc = -EINVAL);
592         }
593
594         desc = ptlrpc_prep_bulk(connection);
595         desc->b_portal = OSC_BULK_PORTAL;
596         if (callback) {
597                 desc->b_cb = brw_write_finish;
598                 OBD_ALLOC(cb_data, sizeof(*cb_data));
599                 cb_data->buf = pagearray;
600                 cb_data->callback = callback;
601                 desc->b_cb_data = cb_data;
602         }
603
604         for (pages = 0, i = 0; i < num_oa; i++) {
605                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
606                         struct ptlrpc_bulk_page *page;
607
608                         ost_unpack_niobuf(&ptr2, &remote);
609
610                         page = ptlrpc_prep_bulk_page(desc);
611                         if (page == NULL)
612                                 GOTO(out3, rc = -ENOMEM);
613
614                         page->b_buf = (void *)(unsigned long)local[pages].addr;
615                         page->b_buflen = local[pages].len;
616                         page->b_xid = remote->xid;
617                 }
618         }
619
620         if (desc->b_page_count != pages)
621                 LBUG();
622
623         rc = ptlrpc_send_bulk(desc);
624         if (rc)
625                 GOTO(out3, rc);
626         if (callback)
627                 GOTO(out, rc);
628
629         /* If there's no callback function, sleep here until complete. */
630         l_wait_event_killable(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
631         if (desc->b_flags & PTL_RPC_FL_INTR)
632                 rc = -EINTR;
633
634         GOTO(out3, rc);
635
636  out3:
637         ptlrpc_free_bulk(desc);
638  out2:
639         ptlrpc_free_req(request);
640         for (pages = 0, i = 0; i < num_oa; i++)
641                 for (j = 0; j < oa_bufs[i]; j++, pages++)
642                         kunmap(pagearray[pages]);
643  out:
644         OBD_FREE(local, pages * sizeof(*local));
645
646         return rc;
647 }
648
649 static int osc_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
650                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
651                    obd_size *count, obd_off *offset, obd_flag *flags,
652                    void *callback)
653 {
654         if (num_oa != 1)
655                 LBUG();
656
657         if (cmd & OBD_BRW_WRITE)
658                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
659                                      offset, flags, (bulk_callback_t)callback);
660         else
661                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
662                                     offset, flags, (bulk_callback_t)callback);
663 }
664
665 static int osc_enqueue(struct lustre_handle *oconn,
666                        struct lustre_handle *parent_lock, __u64 *res_id,
667                        __u32 type, void *extentp, int extent_len, __u32 mode,
668                        int *flags, void *callback, void *data, int datalen,
669                        struct lustre_handle *lockh)
670 {
671         struct obd_device *obddev = class_conn2obd(oconn);
672         struct osc_obd *osc = &obddev->u.osc;
673         struct ptlrpc_connection *conn;
674         struct ptlrpc_client *cl;
675         struct ldlm_extent *extent = extentp;
676         int rc;
677         __u32 mode2;
678
679         /* Filesystem locks are given a bit of special treatment: first we
680          * fixup the lock to start and end on page boundaries. */
681         extent->start &= PAGE_MASK;
682         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
683
684         /* Next, search for already existing extent locks that will cover us */
685         osc_con2dlmcl(oconn, &cl, &conn);
686         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
687                              sizeof(extent), mode, lockh);
688         if (rc == 1) {
689                 /* We already have a lock, and it's referenced */
690                 return 0;
691         }
692
693         /* Next, search for locks that we can upgrade (if we're trying to write)
694          * or are more than we need (if we're trying to read).  Because the VFS
695          * and page cache already protect us locally, lots of readers/writers
696          * can share a single PW lock. */
697         if (mode == LCK_PW)
698                 mode2 = LCK_PR;
699         else
700                 mode2 = LCK_PW;
701
702         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
703                              sizeof(extent), mode2, lockh);
704         if (rc == 1) {
705                 int flags;
706                 /* FIXME: This is not incredibly elegant, but it might
707                  * be more elegant than adding another parameter to
708                  * lock_match.  I want a second opinion. */
709                 ldlm_lock_addref(lockh, mode);
710                 ldlm_lock_decref(lockh, mode2);
711
712                 if (mode == LCK_PR)
713                         return 0;
714
715                 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh, 
716                                       mode, &flags);
717                 if (rc)
718                         LBUG();
719
720                 return rc;
721         }
722
723         rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh, 
724                               NULL, obddev->obd_namespace,
725                               parent_lock, res_id, type, extent, sizeof(extent),
726                               mode, flags, callback, data, datalen, lockh);
727         return rc;
728 }
729
730 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
731                       struct lustre_handle *lockh)
732 {
733         ENTRY;
734
735         ldlm_lock_decref(lockh, mode);
736
737         RETURN(0);
738 }
739
740 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
741 {
742         struct obd_ioctl_data* data = buf;
743         struct osc_obd *osc = &obddev->u.osc;
744         char server_uuid[37];
745         int rc;
746         ENTRY;
747
748         if (data->ioc_inllen1 < 1) {
749                 CERROR("osc setup requires a TARGET UUID\n");
750                 RETURN(-EINVAL);
751         }
752
753         if (data->ioc_inllen1 > 37) {
754                 CERROR("osc TARGET UUID must be less than 38 characters\n");
755                 RETURN(-EINVAL);
756         }
757
758         if (data->ioc_inllen2 < 1) {
759                 CERROR("osc setup requires a SERVER UUID\n");
760                 RETURN(-EINVAL);
761         }
762
763         if (data->ioc_inllen2 > 37) {
764                 CERROR("osc SERVER UUID must be less than 38 characters\n");
765                 RETURN(-EINVAL);
766         }
767
768         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
769         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
770                                                    sizeof(server_uuid)));
771
772         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
773         if (!osc->osc_conn)
774                 RETURN(-ENOENT);
775
776         obddev->obd_namespace =
777                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
778         if (obddev->obd_namespace == NULL)
779                 GOTO(out_conn, rc = -ENOMEM);
780
781         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
782         if (osc->osc_client == NULL)
783                 GOTO(out_ns, rc = -ENOMEM);
784
785         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
786         if (osc->osc_ldlm_client == NULL)
787                 GOTO(out_client, rc = -ENOMEM);
788
789         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
790                            osc->osc_client);
791         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
792                            osc->osc_ldlm_client);
793         osc->osc_client->cli_name = "osc";
794         osc->osc_ldlm_client->cli_name = "ldlm";
795
796         MOD_INC_USE_COUNT;
797         RETURN(0);
798
799  out_client:
800         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
801  out_ns:
802         ldlm_namespace_free(obddev->obd_namespace);
803  out_conn:
804         ptlrpc_put_connection(osc->osc_conn);
805         return rc;
806 }
807
808 static int osc_cleanup(struct obd_device * obddev)
809 {
810         struct osc_obd *osc = &obddev->u.osc;
811
812         ldlm_namespace_free(obddev->obd_namespace);
813
814         ptlrpc_cleanup_client(osc->osc_client);
815         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
816         ptlrpc_cleanup_client(osc->osc_ldlm_client);
817         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
818         ptlrpc_put_connection(osc->osc_conn);
819
820         MOD_DEC_USE_COUNT;
821         return 0;
822 }
823
824 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
825 {
826         struct ptlrpc_request *request;
827         struct ptlrpc_client *cl;
828         struct ptlrpc_connection *connection;
829         struct obd_statfs *osfs;
830         int rc, size = sizeof(*osfs);
831         ENTRY;
832
833         osc_con2cl(conn, &cl, &connection);
834         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
835         if (!request)
836                 RETURN(-ENOMEM);
837
838         request->rq_replen = lustre_msg_size(1, &size);
839
840         rc = ptlrpc_queue_wait(request);
841         rc = ptlrpc_check_status(request, rc);
842         if (rc) {
843                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
844                 GOTO(out, rc);
845         }
846
847         osfs = lustre_msg_buf(request->rq_repmsg, 0);
848         obd_statfs_unpack(osfs, sfs);
849
850         EXIT;
851  out:
852         ptlrpc_free_req(request);
853         return rc;
854 }
855
856 struct obd_ops osc_obd_ops = {
857         o_setup:        osc_setup,
858         o_cleanup:      osc_cleanup,
859         o_statfs:       osc_statfs,
860         o_create:       osc_create,
861         o_destroy:      osc_destroy,
862         o_getattr:      osc_getattr,
863         o_setattr:      osc_setattr,
864         o_open:         osc_open,
865         o_close:        osc_close,
866         o_connect:      osc_connect,
867         o_disconnect:   osc_disconnect,
868         o_brw:          osc_brw,
869         o_punch:        osc_punch,
870         o_enqueue:      osc_enqueue,
871         o_cancel:       osc_cancel
872 };
873
874 static int __init osc_init(void)
875 {
876         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
877 }
878
879 static void __exit osc_exit(void)
880 {
881         class_unregister_type(LUSTRE_OSC_NAME);
882 }
883
884 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
885 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
886 MODULE_LICENSE("GPL");
887
888 module_init(osc_init);
889 module_exit(osc_exit);