Whamcloud - gitweb
Fix a few compiler warnings.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_obd2cl(struct obd_device *obd, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &obd->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
37                        struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
40         *cl = osc->osc_client;
41         *connection = osc->osc_conn;
42 }
43
44 static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
45                           struct ptlrpc_connection **connection)
46 {
47         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
48         *cl = osc->osc_ldlm_client;
49         *connection = osc->osc_conn;
50 }
51
52 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
53 {
54         struct osc_obd *osc = &obd->u.osc;
55         struct obd_import *import;
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *cl;
58         struct ptlrpc_connection *connection;
59         char *tmp = osc->osc_target_uuid;
60         int rc, size = sizeof(osc->osc_target_uuid);
61         ENTRY;
62
63         OBD_ALLOC(import, sizeof(*import)); 
64         if (!import)
65                 RETURN(-ENOMEM);
66                   
67         MOD_INC_USE_COUNT;
68         rc = class_connect(conn, obd);
69         if (rc) 
70                 RETURN(rc); 
71
72         osc_obd2cl(obd, &cl, &connection);
73         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
74                                   OST_CONNECT, 1, &size, &tmp);
75         if (!request)
76                 GOTO(out_disco, -ENOMEM);
77
78         request->rq_replen = lustre_msg_size(0, NULL);
79
80         rc = ptlrpc_queue_wait(request);
81         rc = ptlrpc_check_status(request, rc);
82         if (rc) {
83                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
84                 GOTO(out, rc);
85         }
86
87         /* XXX: Make this a handle */
88         osc->osc_connh.addr = request->rq_repmsg->addr;
89         osc->osc_connh.cookie = request->rq_repmsg->cookie;
90
91         EXIT;
92  out:
93         ptlrpc_free_req(request);
94  out_disco:
95         class_disconnect(conn); 
96         if (rc)
97                 MOD_DEC_USE_COUNT;
98         return rc;
99 }
100
101 static int osc_disconnect(struct lustre_handle *conn)
102 {
103         struct ptlrpc_request *request;
104         struct ptlrpc_client *cl;
105         struct ptlrpc_connection *connection;
106         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
107         int rc;
108         ENTRY;
109
110         osc_con2cl(conn, &cl, &connection);
111         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
112                                   OST_DISCONNECT, 0, NULL, NULL);
113         if (!request)
114                 RETURN(-ENOMEM);
115         request->rq_replen = lustre_msg_size(0, NULL);
116
117         rc = ptlrpc_queue_wait(request);
118         if (rc) 
119                 GOTO(out, rc);
120         rc = class_disconnect(conn);
121         if (!rc)
122                 MOD_DEC_USE_COUNT;
123
124  out:
125         ptlrpc_free_req(request);
126         return rc;
127 }
128
129 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
130 {
131         struct ptlrpc_request *request;
132         struct ptlrpc_client *cl;
133         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
134         struct ptlrpc_connection *connection;
135         struct ost_body *body;
136         int rc, size = sizeof(*body);
137         ENTRY;
138
139         osc_con2cl(conn, &cl, &connection);
140         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
141                                    OST_GETATTR, 1, &size, NULL);
142         if (!request)
143                 RETURN(-ENOMEM);
144
145         body = lustre_msg_buf(request->rq_reqmsg, 0);
146         memcpy(&body->oa, oa, sizeof(*oa));
147         body->oa.o_valid = ~0;
148
149         request->rq_replen = lustre_msg_size(1, &size);
150
151         rc = ptlrpc_queue_wait(request);
152         rc = ptlrpc_check_status(request, rc);
153         if (rc) {
154                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
155                 GOTO(out, rc);
156         }
157
158         body = lustre_msg_buf(request->rq_repmsg, 0);
159         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
160         if (oa)
161                 memcpy(oa, &body->oa, sizeof(*oa));
162
163         EXIT;
164  out:
165         ptlrpc_free_req(request);
166         return 0;
167 }
168
169 static int osc_open(struct lustre_handle *conn, struct obdo *oa)
170 {
171         struct ptlrpc_request *request;
172         struct ptlrpc_client *cl;
173         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
174         struct ptlrpc_connection *connection;
175         struct ost_body *body;
176         int rc, size = sizeof(*body);
177         ENTRY;
178
179         osc_con2cl(conn, &cl, &connection);
180         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
181                                    OST_OPEN, 1, &size, NULL);
182         if (!request)
183                 RETURN(-ENOMEM);
184
185         body = lustre_msg_buf(request->rq_reqmsg, 0);
186         memcpy(&body->oa, oa, sizeof(*oa));
187         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
188
189         request->rq_replen = lustre_msg_size(1, &size);
190
191         rc = ptlrpc_queue_wait(request);
192         rc = ptlrpc_check_status(request, rc);
193         if (rc)
194                 GOTO(out, rc);
195
196         body = lustre_msg_buf(request->rq_repmsg, 0);
197         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198         if (oa)
199                 memcpy(oa, &body->oa, sizeof(*oa));
200
201         EXIT;
202  out:
203         ptlrpc_free_req(request);
204         return 0;
205 }
206
207 static int osc_close(struct lustre_handle *conn, struct obdo *oa)
208 {
209         struct ptlrpc_request *request;
210         struct ptlrpc_client *cl;
211         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
212         struct ptlrpc_connection *connection;
213         struct ost_body *body;
214         int rc, size = sizeof(*body);
215         ENTRY;
216
217         osc_con2cl(conn, &cl, &connection);
218         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
219                                    OST_CLOSE, 1, &size, NULL);
220         if (!request)
221                 RETURN(-ENOMEM);
222
223         body = lustre_msg_buf(request->rq_reqmsg, 0);
224         memcpy(&body->oa, oa, sizeof(*oa));
225
226         request->rq_replen = lustre_msg_size(1, &size);
227
228         rc = ptlrpc_queue_wait(request);
229         rc = ptlrpc_check_status(request, rc);
230         if (rc)
231                 GOTO(out, rc);
232
233         body = lustre_msg_buf(request->rq_repmsg, 0);
234         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
235         if (oa)
236                 memcpy(oa, &body->oa, sizeof(*oa));
237
238         EXIT;
239  out:
240         ptlrpc_free_req(request);
241         return 0;
242 }
243
244 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
245 {
246         struct ptlrpc_request *request;
247         struct ptlrpc_client *cl;
248         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
249         struct ptlrpc_connection *connection;
250         struct ost_body *body;
251         int rc, size = sizeof(*body);
252         ENTRY;
253
254         osc_con2cl(conn, &cl, &connection);
255         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
256                                   OST_SETATTR, 1, &size, NULL);
257         if (!request)
258                 RETURN(-ENOMEM);
259
260         body = lustre_msg_buf(request->rq_reqmsg, 0);
261         memcpy(&body->oa, oa, sizeof(*oa));
262
263         request->rq_replen = lustre_msg_size(1, &size);
264
265         rc = ptlrpc_queue_wait(request);
266         rc = ptlrpc_check_status(request, rc);
267         GOTO(out, rc);
268
269  out:
270         ptlrpc_free_req(request);
271         return 0;
272 }
273
274 static int osc_create(struct lustre_handle *conn, struct obdo *oa)
275 {
276         struct ptlrpc_request *request;
277         struct ptlrpc_client *cl;
278         struct ptlrpc_connection *connection;
279         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
280         struct ost_body *body;
281         struct mds_objid *objid;
282         struct lov_object_id *lov_id;
283         int rc, size = sizeof(*body);
284         ENTRY;
285
286         if (!oa) {
287                 CERROR("oa NULL\n");
288                 RETURN(-EINVAL);
289         }
290         osc_con2cl(conn, &cl, &connection);
291         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
292                                   OST_CREATE, 1, &size, NULL);
293         if (!request)
294                 RETURN(-ENOMEM);
295
296         body = lustre_msg_buf(request->rq_reqmsg, 0);
297         memcpy(&body->oa, oa, sizeof(*oa));
298
299         request->rq_replen = lustre_msg_size(1, &size);
300
301         rc = ptlrpc_queue_wait(request);
302         rc = ptlrpc_check_status(request, rc);
303         if (rc)
304                 GOTO(out, rc);
305
306         body = lustre_msg_buf(request->rq_repmsg, 0);
307         memcpy(oa, &body->oa, sizeof(*oa));
308
309         memset(oa->o_inline, 0, sizeof(oa->o_inline));
310         objid = (struct mds_objid *)oa->o_inline;
311         objid->mo_lov_md.lmd_object_id = oa->o_id;
312         objid->mo_lov_md.lmd_stripe_count = 1;
313         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
314         lov_id->l_device_id = 0;
315         lov_id->l_object_id = oa->o_id;
316
317         EXIT;
318  out:
319         ptlrpc_free_req(request);
320         return 0;
321 }
322
323 static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count,
324                      obd_off offset)
325 {
326         struct ptlrpc_request *request;
327         struct ptlrpc_client *cl;
328         struct ptlrpc_connection *connection;
329         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
330         struct ost_body *body;
331         int rc, size = sizeof(*body);
332         ENTRY;
333
334         if (!oa) {
335                 CERROR("oa NULL\n");
336                 RETURN(-EINVAL);
337         }
338         osc_con2cl(conn, &cl, &connection);
339         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
340                                    OST_PUNCH, 1, &size, NULL);
341         if (!request)
342                 RETURN(-ENOMEM);
343
344         body = lustre_msg_buf(request->rq_reqmsg, 0);
345         memcpy(&body->oa, oa, sizeof(*oa));
346         body->oa.o_blocks = count;
347         body->oa.o_valid |= OBD_MD_FLBLOCKS;
348
349         request->rq_replen = lustre_msg_size(1, &size);
350
351         rc = ptlrpc_queue_wait(request);
352         rc = ptlrpc_check_status(request, rc);
353         if (rc)
354                 GOTO(out, rc);
355
356         body = lustre_msg_buf(request->rq_repmsg, 0);
357         memcpy(oa, &body->oa, sizeof(*oa));
358
359         EXIT;
360  out:
361         ptlrpc_free_req(request);
362         return 0;
363 }
364
365 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
366 {
367         struct ptlrpc_request *request;
368         struct ptlrpc_client *cl;
369         struct ptlrpc_connection *connection;
370         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
371         struct ost_body *body;
372         int rc, size = sizeof(*body);
373         ENTRY;
374
375         if (!oa) {
376                 CERROR("oa NULL\n");
377                 RETURN(-EINVAL);
378         }
379         osc_con2cl(conn, &cl, &connection);
380         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
381                                    OST_DESTROY, 1, &size, NULL);
382         if (!request)
383                 RETURN(-ENOMEM);
384
385         body = lustre_msg_buf(request->rq_reqmsg, 0);
386         memcpy(&body->oa, oa, sizeof(*oa));
387         body->oa.o_valid = ~0;
388
389         request->rq_replen = lustre_msg_size(1, &size);
390
391         rc = ptlrpc_queue_wait(request);
392         rc = ptlrpc_check_status(request, rc);
393         if (rc)
394                 GOTO(out, rc);
395
396         body = lustre_msg_buf(request->rq_repmsg, 0);
397         memcpy(oa, &body->oa, sizeof(*oa));
398
399         EXIT;
400  out:
401         ptlrpc_free_req(request);
402         return 0;
403 }
404
405 struct osc_brw_cb_data {
406         struct page **buf;
407         struct ptlrpc_request *req;
408         bulk_callback_t callback;
409         void *cb_data;
410 };
411
412 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
413 {
414         struct osc_brw_cb_data *cb_data = data;
415
416         if (desc->b_flags & PTL_RPC_FL_INTR)
417                 CERROR("got signal\n");
418
419         (cb_data->callback)(desc, cb_data->cb_data);
420
421         ptlrpc_free_bulk(desc);
422         ptlrpc_free_req(cb_data->req);
423
424         OBD_FREE(cb_data, sizeof(*cb_data));
425 }
426
427 static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
428                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
429                         obd_size *count, obd_off *offset, obd_flag *flags,
430                         bulk_callback_t callback)
431 {
432         struct ptlrpc_client *cl;
433         struct ptlrpc_connection *connection;
434         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
435         struct ptlrpc_request *request;
436         struct ost_body *body;
437         struct list_head *tmp;
438         int pages, rc, i, j, size[3] = {sizeof(*body)};
439         void *ptr1, *ptr2;
440         struct ptlrpc_bulk_desc *desc;
441         ENTRY;
442
443         size[1] = num_oa * sizeof(struct obd_ioobj);
444         pages = 0;
445         for (i = 0; i < num_oa; i++)
446                 pages += oa_bufs[i];
447         size[2] = pages * sizeof(struct niobuf_remote);
448
449         osc_con2cl(conn, &cl, &connection);
450         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
451                                   OST_BRW, 3, size, NULL);
452         if (!request)
453                 GOTO(out, rc = -ENOMEM);
454
455         body = lustre_msg_buf(request->rq_reqmsg, 0);
456         body->data = OBD_BRW_READ;
457
458         desc = ptlrpc_prep_bulk(connection);
459         if (!desc)
460                 GOTO(out2, rc = -ENOMEM);
461         desc->b_portal = OST_BULK_PORTAL;
462
463         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
464         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
465         for (pages = 0, i = 0; i < num_oa; i++) {
466                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
467                 /* FIXME: this inner loop is wrong for multiple OAs */
468                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
469                         struct ptlrpc_bulk_page *bulk;
470                         bulk = ptlrpc_prep_bulk_page(desc);
471                         if (bulk == NULL)
472                                 GOTO(out3, rc = -ENOMEM);
473
474                         spin_lock(&connection->c_lock);
475                         bulk->b_xid = ++connection->c_xid_out;
476                         spin_unlock(&connection->c_lock);
477
478                         bulk->b_buf = kmap(buf[pages]);
479                         bulk->b_page = buf[pages];
480                         bulk->b_buflen = PAGE_SIZE;
481                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
482                                         flags[pages], bulk->b_xid);
483                 }
484         }
485
486         rc = ptlrpc_register_bulk(desc);
487         if (rc)
488                 GOTO(out3, rc);
489
490         request->rq_replen = lustre_msg_size(1, size);
491         rc = ptlrpc_queue_wait(request);
492         rc = ptlrpc_check_status(request, rc);
493         if (rc)
494                 ptlrpc_abort_bulk(desc);
495         GOTO(out3, rc);
496
497  out3:
498         list_for_each(tmp, &desc->b_page_list) {
499                 struct ptlrpc_bulk_page *bulk;
500                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
501                 if (bulk->b_page != NULL)
502                         kunmap(bulk->b_page);
503         }
504         ptlrpc_free_bulk(desc);
505  out2:
506         ptlrpc_free_req(request);
507  out:
508         return rc;
509 }
510
511 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
512 {
513         struct osc_brw_cb_data *cb_data = data;
514         int i;
515         ENTRY;
516
517         if (desc->b_flags & PTL_RPC_FL_INTR)
518                 CERROR("got signal\n");
519
520         for (i = 0; i < desc->b_page_count; i++)
521                 kunmap(cb_data->buf[i]);
522
523         (cb_data->callback)(desc, cb_data->cb_data);
524
525         ptlrpc_free_bulk(desc);
526         ptlrpc_free_req(cb_data->req);
527
528         OBD_FREE(cb_data, sizeof(*cb_data));
529         EXIT;
530 }
531
532 static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
533                          struct obdo **oa, obd_count *oa_bufs,
534                          struct page **pagearray, obd_size *count,
535                          obd_off *offset, obd_flag *flags,
536                          bulk_callback_t callback)
537 {
538         struct ptlrpc_client *cl;
539         struct ptlrpc_connection *connection;
540         struct ptlrpc_request *request;
541         struct ptlrpc_bulk_desc *desc;
542         struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
543         struct obd_ioobj ioo;
544         struct ost_body *body;
545         struct niobuf_local *local;
546         struct niobuf_remote *remote;
547         struct osc_brw_cb_data *cb_data;
548         long pages;
549         int rc, i, j, size[3] = {sizeof(*body)};
550         void *ptr1, *ptr2;
551         ENTRY;
552
553         size[1] = num_oa * sizeof(ioo);
554         pages = 0;
555         for (i = 0; i < num_oa; i++)
556                 pages += oa_bufs[i];
557         size[2] = pages * sizeof(*remote);
558
559         OBD_ALLOC(local, pages * sizeof(*local));
560         if (local == NULL)
561                 RETURN(-ENOMEM);
562
563         osc_con2cl(conn, &cl, &connection);
564         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
565                                   OST_BRW, 3, size, NULL);
566         if (!request)
567                 GOTO(out, rc = -ENOMEM);
568         body = lustre_msg_buf(request->rq_reqmsg, 0);
569         body->data = OBD_BRW_WRITE;
570
571         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
572         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
573         for (pages = 0, i = 0; i < num_oa; i++) {
574                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
575                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
576                         local[pages].addr = kmap(pagearray[pages]);
577                         local[pages].offset = offset[pages];
578                         local[pages].len = count[pages];
579                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
580                                         flags[pages], 0);
581                 }
582         }
583
584         size[1] = pages * sizeof(struct niobuf_remote);
585         request->rq_replen = lustre_msg_size(2, size);
586
587         rc = ptlrpc_queue_wait(request);
588         rc = ptlrpc_check_status(request, rc);
589         if (rc)
590                 GOTO(out2, rc);
591
592         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
593         if (ptr2 == NULL)
594                 GOTO(out2, rc = -EINVAL);
595
596         if (request->rq_repmsg->buflens[1] !=
597             pages * sizeof(struct niobuf_remote)) {
598                 CERROR("buffer length wrong (%d vs. %ld)\n",
599                        request->rq_repmsg->buflens[1],
600                        pages * sizeof(struct niobuf_remote));
601                 GOTO(out2, rc = -EINVAL);
602         }
603
604         desc = ptlrpc_prep_bulk(connection);
605         desc->b_portal = OSC_BULK_PORTAL;
606         if (callback) {
607                 desc->b_cb = brw_write_finish;
608                 OBD_ALLOC(cb_data, sizeof(*cb_data));
609                 cb_data->buf = pagearray;
610                 cb_data->callback = callback;
611                 desc->b_cb_data = cb_data;
612         }
613
614         for (pages = 0, i = 0; i < num_oa; i++) {
615                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
616                         struct ptlrpc_bulk_page *page;
617
618                         ost_unpack_niobuf(&ptr2, &remote);
619
620                         page = ptlrpc_prep_bulk_page(desc);
621                         if (page == NULL)
622                                 GOTO(out3, rc = -ENOMEM);
623
624                         page->b_buf = (void *)(unsigned long)local[pages].addr;
625                         page->b_buflen = local[pages].len;
626                         page->b_xid = remote->xid;
627                 }
628         }
629
630         if (desc->b_page_count != pages)
631                 LBUG();
632
633         rc = ptlrpc_send_bulk(desc);
634         if (rc)
635                 GOTO(out3, rc);
636         if (callback)
637                 GOTO(out, rc);
638
639         /* If there's no callback function, sleep here until complete. */
640         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
641         if (desc->b_flags & PTL_RPC_FL_INTR)
642                 rc = -EINTR;
643
644         GOTO(out3, rc);
645
646  out3:
647         ptlrpc_free_bulk(desc);
648  out2:
649         ptlrpc_free_req(request);
650         for (pages = 0, i = 0; i < num_oa; i++)
651                 for (j = 0; j < oa_bufs[i]; j++, pages++)
652                         kunmap(pagearray[pages]);
653  out:
654         OBD_FREE(local, pages * sizeof(*local));
655
656         return rc;
657 }
658
659 static int osc_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
660                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
661                    obd_size *count, obd_off *offset, obd_flag *flags,
662                    void *callback)
663 {
664         if (num_oa != 1)
665                 LBUG();
666
667         if (cmd & OBD_BRW_WRITE)
668                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
669                                      offset, flags, (bulk_callback_t)callback);
670         else
671                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
672                                     offset, flags, (bulk_callback_t)callback);
673 }
674
675 static int osc_enqueue(struct lustre_handle *oconn,
676                        struct lustre_handle *parent_lock, __u64 *res_id,
677                        __u32 type, void *extentp, int extent_len, __u32 mode,
678                        int *flags, void *callback, void *data, int datalen,
679                        struct lustre_handle *lockh)
680 {
681         struct obd_device *obddev = class_conn2obd(oconn);
682         struct osc_obd *osc = &obddev->u.osc;
683         struct ptlrpc_connection *conn;
684         struct ptlrpc_client *cl;
685         struct ldlm_extent *extent = extentp;
686         int rc;
687         __u32 mode2;
688
689         /* Filesystem locks are given a bit of special treatment: first we
690          * fixup the lock to start and end on page boundaries. */
691         extent->start &= PAGE_MASK;
692         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
693
694         /* Next, search for already existing extent locks that will cover us */
695         osc_con2dlmcl(oconn, &cl, &conn);
696         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
697                              sizeof(extent), mode, lockh);
698         if (rc == 1) {
699                 /* We already have a lock, and it's referenced */
700                 return 0;
701         }
702
703         /* Next, search for locks that we can upgrade (if we're trying to write)
704          * or are more than we need (if we're trying to read).  Because the VFS
705          * and page cache already protect us locally, lots of readers/writers
706          * can share a single PW lock. */
707         if (mode == LCK_PW)
708                 mode2 = LCK_PR;
709         else
710                 mode2 = LCK_PW;
711
712         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
713                              sizeof(extent), mode2, lockh);
714         if (rc == 1) {
715                 int flags;
716                 /* FIXME: This is not incredibly elegant, but it might
717                  * be more elegant than adding another parameter to
718                  * lock_match.  I want a second opinion. */
719                 ldlm_lock_addref(lockh, mode);
720                 ldlm_lock_decref(lockh, mode2);
721
722                 if (mode == LCK_PR)
723                         return 0;
724
725                 rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh, 
726                                       mode, &flags);
727                 if (rc)
728                         LBUG();
729
730                 return rc;
731         }
732
733         rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh, 
734                               NULL, obddev->obd_namespace,
735                               parent_lock, res_id, type, extent, sizeof(extent),
736                               mode, flags, callback, data, datalen, lockh);
737         return rc;
738 }
739
740 static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
741                       struct lustre_handle *lockh)
742 {
743         ENTRY;
744
745         ldlm_lock_decref(lockh, mode);
746
747         RETURN(0);
748 }
749
750 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
751 {
752         struct obd_ioctl_data* data = buf;
753         struct osc_obd *osc = &obddev->u.osc;
754         char server_uuid[37];
755         int rc;
756         ENTRY;
757
758         if (data->ioc_inllen1 < 1) {
759                 CERROR("osc setup requires a TARGET UUID\n");
760                 RETURN(-EINVAL);
761         }
762
763         if (data->ioc_inllen1 > 37) {
764                 CERROR("osc TARGET UUID must be less than 38 characters\n");
765                 RETURN(-EINVAL);
766         }
767
768         if (data->ioc_inllen2 < 1) {
769                 CERROR("osc setup requires a SERVER UUID\n");
770                 RETURN(-EINVAL);
771         }
772
773         if (data->ioc_inllen2 > 37) {
774                 CERROR("osc SERVER UUID must be less than 38 characters\n");
775                 RETURN(-EINVAL);
776         }
777
778         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
779         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
780                                                    sizeof(server_uuid)));
781
782         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
783         if (!osc->osc_conn)
784                 RETURN(-ENOENT);
785
786         obddev->obd_namespace =
787                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
788         if (obddev->obd_namespace == NULL)
789                 GOTO(out_conn, rc = -ENOMEM);
790
791         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
792         if (osc->osc_client == NULL)
793                 GOTO(out_ns, rc = -ENOMEM);
794
795         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
796         if (osc->osc_ldlm_client == NULL)
797                 GOTO(out_client, rc = -ENOMEM);
798
799         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
800                            osc->osc_client);
801         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
802                            osc->osc_ldlm_client);
803         osc->osc_client->cli_name = "osc";
804         osc->osc_ldlm_client->cli_name = "ldlm";
805
806         MOD_INC_USE_COUNT;
807         RETURN(0);
808
809  out_client:
810         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
811  out_ns:
812         ldlm_namespace_free(obddev->obd_namespace);
813  out_conn:
814         ptlrpc_put_connection(osc->osc_conn);
815         return rc;
816 }
817
818 static int osc_cleanup(struct obd_device * obddev)
819 {
820         struct osc_obd *osc = &obddev->u.osc;
821
822         ldlm_namespace_free(obddev->obd_namespace);
823
824         ptlrpc_cleanup_client(osc->osc_client);
825         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
826         ptlrpc_cleanup_client(osc->osc_ldlm_client);
827         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
828         ptlrpc_put_connection(osc->osc_conn);
829
830         MOD_DEC_USE_COUNT;
831         return 0;
832 }
833
834 #if 0
835 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs);
836 {
837         struct ptlrpc_request *request;
838         struct ptlrpc_client *cl;
839         struct ptlrpc_connection *connection;
840         struct obd_statfs *osfs;
841         int rc, size = sizeof(*osfs);
842         ENTRY;
843
844         osc_con2cl(conn, &cl, &connection);
845         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
846         if (!request)
847                 RETURN(-ENOMEM);
848
849         request->rq_replen = lustre_msg_size(1, &size);
850
851         rc = ptlrpc_queue_wait(request);
852         rc = ptlrpc_check_status(request, rc);
853         if (rc)
854                 GOTO(out, rc);
855
856         osfs = lustre_msg_buf(request->rq_repmsg, 0);
857         obd_statfs_unpack(sfs, osfs);
858
859         EXIT;
860  out:
861         ptlrpc_free_req(request);
862         return 0;
863 }
864 #endif
865
866 struct obd_ops osc_obd_ops = {
867         o_setup:   osc_setup,
868         o_cleanup: osc_cleanup,
869         o_create: osc_create,
870         o_destroy: osc_destroy,
871         o_getattr: osc_getattr,
872         o_setattr: osc_setattr,
873         o_open: osc_open,
874         o_close: osc_close,
875         o_connect: osc_connect,
876         o_disconnect: osc_disconnect,
877         o_brw: osc_brw,
878         o_punch: osc_punch,
879         o_enqueue: osc_enqueue,
880         o_cancel: osc_cancel
881 };
882
883 static int __init osc_init(void)
884 {
885         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
886 }
887
888 static void __exit osc_exit(void)
889 {
890         class_unregister_type(LUSTRE_OSC_NAME);
891 }
892
893 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
894 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
895 MODULE_LICENSE("GPL");
896
897 module_init(osc_init);
898 module_exit(osc_exit);