Whamcloud - gitweb
- The same changes made to the OST are now working in the MDS
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27
28 static void osc_obd2cl(struct obd_device *obd, struct ptlrpc_client **cl,
29                        struct ptlrpc_connection **connection)
30 {
31         struct osc_obd *osc = &obd->u.osc;
32         *cl = osc->osc_client;
33         *connection = osc->osc_conn;
34 }
35
36 static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl,
37                        struct ptlrpc_connection **connection)
38 {
39         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
40         *cl = osc->osc_client;
41         *connection = osc->osc_conn;
42 }
43
44 static void osc_con2dlmcl(struct obd_conn *conn, struct ptlrpc_client **cl,
45                           struct ptlrpc_connection **connection)
46 {
47         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
48         *cl = osc->osc_ldlm_client;
49         *connection = osc->osc_conn;
50 }
51
52 static int osc_connect(struct obd_conn *conn, struct obd_device *obd)
53 {
54         struct osc_obd *osc = &obd->u.osc;
55         struct obd_import *import;
56         struct ptlrpc_request *request;
57         struct ptlrpc_client *cl;
58         struct ptlrpc_connection *connection;
59         char *tmp = osc->osc_target_uuid;
60         int rc, size = sizeof(osc->osc_target_uuid);
61         ENTRY;
62
63         OBD_ALLOC(import, sizeof(*import)); 
64         if (!import)
65                 RETURN(-ENOMEM);
66                   
67         MOD_INC_USE_COUNT;
68         rc = gen_connect(conn, obd);
69         if (rc) 
70                 GOTO(out, rc);
71
72         osc_obd2cl(obd, &cl, &connection);
73         request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
74                                   OST_CONNECT, 1, &size, &tmp);
75         if (!request)
76                 RETURN(-ENOMEM);
77
78         request->rq_replen = lustre_msg_size(0, NULL);
79
80         rc = ptlrpc_queue_wait(request);
81         rc = ptlrpc_check_status(request, rc);
82         if (rc) {
83                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
84                 GOTO(out, rc);
85         }
86
87         /* XXX: Make this a handle */
88         osc->osc_connh.addr = request->rq_repmsg->addr;
89         osc->osc_connh.cookie = request->rq_repmsg->cookie;
90
91         EXIT;
92  out:
93         ptlrpc_free_req(request);
94         if (rc)
95                 MOD_DEC_USE_COUNT;
96         return rc;
97 }
98
99 static int osc_disconnect(struct obd_conn *conn)
100 {
101         struct ptlrpc_request *request;
102         struct ptlrpc_client *cl;
103         struct ptlrpc_connection *connection;
104         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
105         int rc;
106         ENTRY;
107
108         osc_con2cl(conn, &cl, &connection);
109         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
110                                   OST_DISCONNECT, 0, NULL, NULL);
111         if (!request)
112                 RETURN(-ENOMEM);
113         request->rq_replen = lustre_msg_size(0, NULL);
114
115         rc = ptlrpc_queue_wait(request);
116         if (rc) 
117                 GOTO(out, rc);
118         rc = gen_disconnect(conn);
119         if (!rc)
120                 MOD_DEC_USE_COUNT;
121
122  out:
123         ptlrpc_free_req(request);
124         return rc;
125 }
126
127 static int osc_getattr(struct obd_conn *conn, struct obdo *oa)
128 {
129         struct ptlrpc_request *request;
130         struct ptlrpc_client *cl;
131         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
132         struct ptlrpc_connection *connection;
133         struct ost_body *body;
134         int rc, size = sizeof(*body);
135         ENTRY;
136
137         osc_con2cl(conn, &cl, &connection);
138         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
139                                    OST_GETATTR, 1, &size, NULL);
140         if (!request)
141                 RETURN(-ENOMEM);
142
143         body = lustre_msg_buf(request->rq_reqmsg, 0);
144         memcpy(&body->oa, oa, sizeof(*oa));
145         body->connid = conn->oc_id;
146         body->oa.o_valid = ~0;
147
148         request->rq_replen = lustre_msg_size(1, &size);
149
150         rc = ptlrpc_queue_wait(request);
151         rc = ptlrpc_check_status(request, rc);
152         if (rc) {
153                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
154                 GOTO(out, rc);
155         }
156
157         body = lustre_msg_buf(request->rq_repmsg, 0);
158         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
159         if (oa)
160                 memcpy(oa, &body->oa, sizeof(*oa));
161
162         EXIT;
163  out:
164         ptlrpc_free_req(request);
165         return 0;
166 }
167
168 static int osc_open(struct obd_conn *conn, struct obdo *oa)
169 {
170         struct ptlrpc_request *request;
171         struct ptlrpc_client *cl;
172         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
173         struct ptlrpc_connection *connection;
174         struct ost_body *body;
175         int rc, size = sizeof(*body);
176         ENTRY;
177
178         osc_con2cl(conn, &cl, &connection);
179         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
180                                    OST_OPEN, 1, &size, NULL);
181         if (!request)
182                 RETURN(-ENOMEM);
183
184         body = lustre_msg_buf(request->rq_reqmsg, 0);
185         memcpy(&body->oa, oa, sizeof(*oa));
186         body->connid = conn->oc_id;
187         body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
188
189         request->rq_replen = lustre_msg_size(1, &size);
190
191         rc = ptlrpc_queue_wait(request);
192         rc = ptlrpc_check_status(request, rc);
193         if (rc)
194                 GOTO(out, rc);
195
196         body = lustre_msg_buf(request->rq_repmsg, 0);
197         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198         if (oa)
199                 memcpy(oa, &body->oa, sizeof(*oa));
200
201         EXIT;
202  out:
203         ptlrpc_free_req(request);
204         return 0;
205 }
206
207 static int osc_close(struct obd_conn *conn, struct obdo *oa)
208 {
209         struct ptlrpc_request *request;
210         struct ptlrpc_client *cl;
211         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
212         struct ptlrpc_connection *connection;
213         struct ost_body *body;
214         int rc, size = sizeof(*body);
215         ENTRY;
216
217         osc_con2cl(conn, &cl, &connection);
218         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
219                                    OST_CLOSE, 1, &size, NULL);
220         if (!request)
221                 RETURN(-ENOMEM);
222
223         body = lustre_msg_buf(request->rq_reqmsg, 0);
224         memcpy(&body->oa, oa, sizeof(*oa));
225         body->connid = conn->oc_id;
226
227         request->rq_replen = lustre_msg_size(1, &size);
228
229         rc = ptlrpc_queue_wait(request);
230         rc = ptlrpc_check_status(request, rc);
231         if (rc)
232                 GOTO(out, rc);
233
234         body = lustre_msg_buf(request->rq_repmsg, 0);
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         if (oa)
237                 memcpy(oa, &body->oa, sizeof(*oa));
238
239         EXIT;
240  out:
241         ptlrpc_free_req(request);
242         return 0;
243 }
244
245 static int osc_setattr(struct obd_conn *conn, struct obdo *oa)
246 {
247         struct ptlrpc_request *request;
248         struct ptlrpc_client *cl;
249         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
250         struct ptlrpc_connection *connection;
251         struct ost_body *body;
252         int rc, size = sizeof(*body);
253         ENTRY;
254
255         osc_con2cl(conn, &cl, &connection);
256         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
257                                   OST_SETATTR, 1, &size, NULL);
258         if (!request)
259                 RETURN(-ENOMEM);
260
261         body = lustre_msg_buf(request->rq_reqmsg, 0);
262         memcpy(&body->oa, oa, sizeof(*oa));
263         body->connid = conn->oc_id;
264
265         request->rq_replen = lustre_msg_size(1, &size);
266
267         rc = ptlrpc_queue_wait(request);
268         rc = ptlrpc_check_status(request, rc);
269         GOTO(out, rc);
270
271  out:
272         ptlrpc_free_req(request);
273         return 0;
274 }
275
276 static int osc_create(struct obd_conn *conn, struct obdo *oa)
277 {
278         struct ptlrpc_request *request;
279         struct ptlrpc_client *cl;
280         struct ptlrpc_connection *connection;
281         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
282         struct ost_body *body;
283         struct mds_objid *objid;
284         struct lov_object_id *lov_id;
285         int rc, size = sizeof(*body);
286         ENTRY;
287
288         if (!oa) {
289                 CERROR("oa NULL\n");
290                 RETURN(-EINVAL);
291         }
292         osc_con2cl(conn, &cl, &connection);
293         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
294                                   OST_CREATE, 1, &size, NULL);
295         if (!request)
296                 RETURN(-ENOMEM);
297
298         body = lustre_msg_buf(request->rq_reqmsg, 0);
299         memcpy(&body->oa, oa, sizeof(*oa));
300         body->connid = conn->oc_id;
301
302         request->rq_replen = lustre_msg_size(1, &size);
303
304         rc = ptlrpc_queue_wait(request);
305         rc = ptlrpc_check_status(request, rc);
306         if (rc)
307                 GOTO(out, rc);
308
309         body = lustre_msg_buf(request->rq_repmsg, 0);
310         memcpy(oa, &body->oa, sizeof(*oa));
311
312         memset(oa->o_inline, 0, sizeof(oa->o_inline));
313         objid = (struct mds_objid *)oa->o_inline;
314         objid->mo_lov_md.lmd_object_id = oa->o_id;
315         objid->mo_lov_md.lmd_stripe_count = 1;
316         lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
317         lov_id->l_device_id = 0;
318         lov_id->l_object_id = oa->o_id;
319
320         EXIT;
321  out:
322         ptlrpc_free_req(request);
323         return 0;
324 }
325
326 static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count,
327                      obd_off offset)
328 {
329         struct ptlrpc_request *request;
330         struct ptlrpc_client *cl;
331         struct ptlrpc_connection *connection;
332         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
333         struct ost_body *body;
334         int rc, size = sizeof(*body);
335         ENTRY;
336
337         if (!oa) {
338                 CERROR("oa NULL\n");
339                 RETURN(-EINVAL);
340         }
341         osc_con2cl(conn, &cl, &connection);
342         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
343                                    OST_PUNCH, 1, &size, NULL);
344         if (!request)
345                 RETURN(-ENOMEM);
346
347         body = lustre_msg_buf(request->rq_reqmsg, 0);
348         memcpy(&body->oa, oa, sizeof(*oa));
349         body->connid = conn->oc_id;
350         body->oa.o_blocks = count;
351         body->oa.o_valid |= OBD_MD_FLBLOCKS;
352
353         request->rq_replen = lustre_msg_size(1, &size);
354
355         rc = ptlrpc_queue_wait(request);
356         rc = ptlrpc_check_status(request, rc);
357         if (rc)
358                 GOTO(out, rc);
359
360         body = lustre_msg_buf(request->rq_repmsg, 0);
361         memcpy(oa, &body->oa, sizeof(*oa));
362
363         EXIT;
364  out:
365         ptlrpc_free_req(request);
366         return 0;
367 }
368
369 static int osc_destroy(struct obd_conn *conn, struct obdo *oa)
370 {
371         struct ptlrpc_request *request;
372         struct ptlrpc_client *cl;
373         struct ptlrpc_connection *connection;
374         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
375         struct ost_body *body;
376         int rc, size = sizeof(*body);
377         ENTRY;
378
379         if (!oa) {
380                 CERROR("oa NULL\n");
381                 RETURN(-EINVAL);
382         }
383         osc_con2cl(conn, &cl, &connection);
384         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
385                                    OST_DESTROY, 1, &size, NULL);
386         if (!request)
387                 RETURN(-ENOMEM);
388
389         body = lustre_msg_buf(request->rq_reqmsg, 0);
390         memcpy(&body->oa, oa, sizeof(*oa));
391         body->connid = conn->oc_id;
392         body->oa.o_valid = ~0;
393
394         request->rq_replen = lustre_msg_size(1, &size);
395
396         rc = ptlrpc_queue_wait(request);
397         rc = ptlrpc_check_status(request, rc);
398         if (rc)
399                 GOTO(out, rc);
400
401         body = lustre_msg_buf(request->rq_repmsg, 0);
402         memcpy(oa, &body->oa, sizeof(*oa));
403
404         EXIT;
405  out:
406         ptlrpc_free_req(request);
407         return 0;
408 }
409
410 struct osc_brw_cb_data {
411         struct page **buf;
412         struct ptlrpc_request *req;
413         bulk_callback_t callback;
414         void *cb_data;
415 };
416
417 static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
418 {
419         struct osc_brw_cb_data *cb_data = data;
420
421         if (desc->b_flags & PTL_RPC_FL_INTR)
422                 CERROR("got signal\n");
423
424         (cb_data->callback)(desc, cb_data->cb_data);
425
426         ptlrpc_free_bulk(desc);
427         ptlrpc_free_req(cb_data->req);
428
429         OBD_FREE(cb_data, sizeof(*cb_data));
430 }
431
432 static int osc_brw_read(struct obd_conn *conn, obd_count num_oa,
433                         struct obdo **oa, obd_count *oa_bufs, struct page **buf,
434                         obd_size *count, obd_off *offset, obd_flag *flags,
435                         bulk_callback_t callback)
436 {
437         struct ptlrpc_client *cl;
438         struct ptlrpc_connection *connection;
439         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
440         struct ptlrpc_request *request;
441         struct ost_body *body;
442         struct list_head *tmp;
443         int pages, rc, i, j, size[3] = {sizeof(*body)};
444         void *ptr1, *ptr2;
445         struct ptlrpc_bulk_desc *desc;
446         ENTRY;
447
448         size[1] = num_oa * sizeof(struct obd_ioobj);
449         pages = 0;
450         for (i = 0; i < num_oa; i++)
451                 pages += oa_bufs[i];
452         size[2] = pages * sizeof(struct niobuf_remote);
453
454         osc_con2cl(conn, &cl, &connection);
455         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
456                                   OST_BRW, 3, size, NULL);
457         if (!request)
458                 GOTO(out, rc = -ENOMEM);
459
460         body = lustre_msg_buf(request->rq_reqmsg, 0);
461         body->data = OBD_BRW_READ;
462
463         desc = ptlrpc_prep_bulk(connection);
464         if (!desc)
465                 GOTO(out2, rc = -ENOMEM);
466         desc->b_portal = OST_BULK_PORTAL;
467
468         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
469         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
470         for (pages = 0, i = 0; i < num_oa; i++) {
471                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
472                 /* FIXME: this inner loop is wrong for multiple OAs */
473                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
474                         struct ptlrpc_bulk_page *bulk;
475                         bulk = ptlrpc_prep_bulk_page(desc);
476                         if (bulk == NULL)
477                                 GOTO(out3, rc = -ENOMEM);
478
479                         spin_lock(&connection->c_lock);
480                         bulk->b_xid = ++connection->c_xid_out;
481                         spin_unlock(&connection->c_lock);
482
483                         bulk->b_buf = kmap(buf[pages]);
484                         bulk->b_page = buf[pages];
485                         bulk->b_buflen = PAGE_SIZE;
486                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
487                                         flags[pages], bulk->b_xid);
488                 }
489         }
490
491         rc = ptlrpc_register_bulk(desc);
492         if (rc)
493                 GOTO(out3, rc);
494
495         request->rq_replen = lustre_msg_size(1, size);
496         rc = ptlrpc_queue_wait(request);
497         rc = ptlrpc_check_status(request, rc);
498         if (rc)
499                 ptlrpc_abort_bulk(desc);
500         GOTO(out3, rc);
501
502  out3:
503         list_for_each(tmp, &desc->b_page_list) {
504                 struct ptlrpc_bulk_page *bulk;
505                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
506                 if (bulk->b_page != NULL)
507                         kunmap(bulk->b_page);
508         }
509         ptlrpc_free_bulk(desc);
510  out2:
511         ptlrpc_free_req(request);
512  out:
513         return rc;
514 }
515
516 static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
517 {
518         struct osc_brw_cb_data *cb_data = data;
519         int i;
520         ENTRY;
521
522         if (desc->b_flags & PTL_RPC_FL_INTR)
523                 CERROR("got signal\n");
524
525         for (i = 0; i < desc->b_page_count; i++)
526                 kunmap(cb_data->buf[i]);
527
528         (cb_data->callback)(desc, cb_data->cb_data);
529
530         ptlrpc_free_bulk(desc);
531         ptlrpc_free_req(cb_data->req);
532
533         OBD_FREE(cb_data, sizeof(*cb_data));
534         EXIT;
535 }
536
537 static int osc_brw_write(struct obd_conn *conn, obd_count num_oa,
538                          struct obdo **oa, obd_count *oa_bufs,
539                          struct page **pagearray, obd_size *count,
540                          obd_off *offset, obd_flag *flags,
541                          bulk_callback_t callback)
542 {
543         struct ptlrpc_client *cl;
544         struct ptlrpc_connection *connection;
545         struct ptlrpc_request *request;
546         struct ptlrpc_bulk_desc *desc;
547         struct osc_obd *osc = &gen_conn2obd(conn)->u.osc;
548         struct obd_ioobj ioo;
549         struct ost_body *body;
550         struct niobuf_local *local;
551         struct niobuf_remote *remote;
552         struct osc_brw_cb_data *cb_data;
553         long pages;
554         int rc, i, j, size[3] = {sizeof(*body)};
555         void *ptr1, *ptr2;
556         ENTRY;
557
558         size[1] = num_oa * sizeof(ioo);
559         pages = 0;
560         for (i = 0; i < num_oa; i++)
561                 pages += oa_bufs[i];
562         size[2] = pages * sizeof(*remote);
563
564         OBD_ALLOC(local, pages * sizeof(*local));
565         if (local == NULL)
566                 RETURN(-ENOMEM);
567
568         osc_con2cl(conn, &cl, &connection);
569         request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
570                                   OST_BRW, 3, size, NULL);
571         if (!request)
572                 GOTO(out, rc = -ENOMEM);
573         body = lustre_msg_buf(request->rq_reqmsg, 0);
574         body->data = OBD_BRW_WRITE;
575
576         ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
577         ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
578         for (pages = 0, i = 0; i < num_oa; i++) {
579                 ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
580                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
581                         local[pages].addr = kmap(pagearray[pages]);
582                         local[pages].offset = offset[pages];
583                         local[pages].len = count[pages];
584                         ost_pack_niobuf(&ptr2, offset[pages], count[pages],
585                                         flags[pages], 0);
586                 }
587         }
588
589         size[1] = pages * sizeof(struct niobuf_remote);
590         request->rq_replen = lustre_msg_size(2, size);
591
592         rc = ptlrpc_queue_wait(request);
593         rc = ptlrpc_check_status(request, rc);
594         if (rc)
595                 GOTO(out2, rc);
596
597         ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
598         if (ptr2 == NULL)
599                 GOTO(out2, rc = -EINVAL);
600
601         if (request->rq_repmsg->buflens[1] !=
602             pages * sizeof(struct niobuf_remote)) {
603                 CERROR("buffer length wrong (%d vs. %ld)\n",
604                        request->rq_repmsg->buflens[1],
605                        pages * sizeof(struct niobuf_remote));
606                 GOTO(out2, rc = -EINVAL);
607         }
608
609         desc = ptlrpc_prep_bulk(connection);
610         desc->b_portal = OSC_BULK_PORTAL;
611         if (callback) {
612                 desc->b_cb = brw_write_finish;
613                 OBD_ALLOC(cb_data, sizeof(*cb_data));
614                 cb_data->buf = pagearray;
615                 cb_data->callback = callback;
616                 desc->b_cb_data = cb_data;
617         }
618
619         for (pages = 0, i = 0; i < num_oa; i++) {
620                 for (j = 0; j < oa_bufs[i]; j++, pages++) {
621                         struct ptlrpc_bulk_page *page;
622
623                         ost_unpack_niobuf(&ptr2, &remote);
624
625                         page = ptlrpc_prep_bulk_page(desc);
626                         if (page == NULL)
627                                 GOTO(out3, rc = -ENOMEM);
628
629                         page->b_buf = (void *)(unsigned long)local[pages].addr;
630                         page->b_buflen = local[pages].len;
631                         page->b_xid = remote->xid;
632                 }
633         }
634
635         if (desc->b_page_count != pages)
636                 LBUG();
637
638         rc = ptlrpc_send_bulk(desc);
639         if (rc)
640                 GOTO(out3, rc);
641         if (callback)
642                 GOTO(out, rc);
643
644         /* If there's no callback function, sleep here until complete. */
645         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
646         if (desc->b_flags & PTL_RPC_FL_INTR)
647                 rc = -EINTR;
648
649         GOTO(out3, rc);
650
651  out3:
652         ptlrpc_free_bulk(desc);
653  out2:
654         ptlrpc_free_req(request);
655         for (pages = 0, i = 0; i < num_oa; i++)
656                 for (j = 0; j < oa_bufs[i]; j++, pages++)
657                         kunmap(pagearray[pages]);
658  out:
659         OBD_FREE(local, pages * sizeof(*local));
660
661         return rc;
662 }
663
664 static int osc_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
665                    struct obdo **oa, obd_count *oa_bufs, struct page **buf,
666                    obd_size *count, obd_off *offset, obd_flag *flags,
667                    void *callback)
668 {
669         if (num_oa != 1)
670                 LBUG();
671
672         if (cmd & OBD_BRW_WRITE)
673                 return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
674                                      offset, flags, (bulk_callback_t)callback);
675         else
676                 return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
677                                     offset, flags, (bulk_callback_t)callback);
678 }
679
680 static int osc_enqueue(struct obd_conn *oconn,
681                        struct lustre_handle *parent_lock, __u64 *res_id,
682                        __u32 type, void *extentp, int extent_len, __u32 mode,
683                        int *flags, void *callback, void *data, int datalen,
684                        struct lustre_handle *lockh)
685 {
686         struct obd_device *obddev = gen_conn2obd(oconn);
687         struct ptlrpc_connection *conn;
688         struct osc_obd *osc = &gen_conn2obd(oconn)->u.osc;
689         struct ptlrpc_client *cl;
690         struct ldlm_extent *extent = extentp;
691         int rc;
692         __u32 mode2;
693
694         /* Filesystem locks are given a bit of special treatment: first we
695          * fixup the lock to start and end on page boundaries. */
696         extent->start &= PAGE_MASK;
697         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
698
699         /* Next, search for already existing extent locks that will cover us */
700         osc_con2dlmcl(oconn, &cl, &conn);
701         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
702                              sizeof(extent), mode, lockh);
703         if (rc == 1) {
704                 /* We already have a lock, and it's referenced */
705                 return 0;
706         }
707
708         /* Next, search for locks that we can upgrade (if we're trying to write)
709          * or are more than we need (if we're trying to read).  Because the VFS
710          * and page cache already protect us locally, lots of readers/writers
711          * can share a single PW lock. */
712         if (mode == LCK_PW)
713                 mode2 = LCK_PR;
714         else
715                 mode2 = LCK_PW;
716
717         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
718                              sizeof(extent), mode2, lockh);
719         if (rc == 1) {
720                 int flags;
721                 /* FIXME: This is not incredibly elegant, but it might
722                  * be more elegant than adding another parameter to
723                  * lock_match.  I want a second opinion. */
724                 ldlm_lock_addref(lockh, mode);
725                 ldlm_lock_decref(lockh, mode2);
726
727                 if (mode == LCK_PR)
728                         return 0;
729
730                 rc = ldlm_cli_convert(cl, lockh, mode, &flags);
731                 if (rc)
732                         LBUG();
733
734                 return rc;
735         }
736
737         rc = ldlm_cli_enqueue(cl, conn, NULL, obddev->obd_namespace,
738                               parent_lock, res_id, type, extent, sizeof(extent),
739                               mode, flags, callback, data, datalen, lockh);
740         return rc;
741 }
742
743 static int osc_cancel(struct obd_conn *oconn, __u32 mode,
744                       struct lustre_handle *lockh)
745 {
746         ENTRY;
747
748         ldlm_lock_decref(lockh, mode);
749
750         RETURN(0);
751 }
752
753 static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
754 {
755         struct obd_ioctl_data* data = buf;
756         struct osc_obd *osc = &obddev->u.osc;
757         char server_uuid[37];
758         int rc;
759         ENTRY;
760
761         if (data->ioc_inllen1 < 1) {
762                 CERROR("osc setup requires a TARGET UUID\n");
763                 RETURN(-EINVAL);
764         }
765
766         if (data->ioc_inllen1 > 37) {
767                 CERROR("osc TARGET UUID must be less than 38 characters\n");
768                 RETURN(-EINVAL);
769         }
770
771         if (data->ioc_inllen2 < 1) {
772                 CERROR("osc setup requires a SERVER UUID\n");
773                 RETURN(-EINVAL);
774         }
775
776         if (data->ioc_inllen2 > 37) {
777                 CERROR("osc SERVER UUID must be less than 38 characters\n");
778                 RETURN(-EINVAL);
779         }
780
781         memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
782         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
783                                                    sizeof(server_uuid)));
784
785         osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
786         if (!osc->osc_conn)
787                 RETURN(-ENOENT);
788
789         obddev->obd_namespace =
790                 ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
791         if (obddev->obd_namespace == NULL)
792                 GOTO(out_conn, rc = -ENOMEM);
793
794         OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
795         if (osc->osc_client == NULL)
796                 GOTO(out_ns, rc = -ENOMEM);
797
798         OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
799         if (osc->osc_ldlm_client == NULL)
800                 GOTO(out_client, rc = -ENOMEM);
801
802         ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
803                            osc->osc_client);
804         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
805                            osc->osc_ldlm_client);
806         osc->osc_client->cli_name = "osc";
807         osc->osc_ldlm_client->cli_name = "ldlm";
808
809         MOD_INC_USE_COUNT;
810         RETURN(0);
811
812  out_client:
813         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
814  out_ns:
815         ldlm_namespace_free(obddev->obd_namespace);
816  out_conn:
817         ptlrpc_put_connection(osc->osc_conn);
818         return rc;
819 }
820
821 static int osc_cleanup(struct obd_device * obddev)
822 {
823         struct osc_obd *osc = &obddev->u.osc;
824
825         ldlm_namespace_free(obddev->obd_namespace);
826
827         ptlrpc_cleanup_client(osc->osc_client);
828         OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
829         ptlrpc_cleanup_client(osc->osc_ldlm_client);
830         OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
831         ptlrpc_put_connection(osc->osc_conn);
832
833         MOD_DEC_USE_COUNT;
834         return 0;
835 }
836
837 #if 0
838 static int osc_statfs(struct obd_conn *conn, struct statfs *sfs);
839 {
840         struct ptlrpc_request *request;
841         struct ptlrpc_client *cl;
842         struct ptlrpc_connection *connection;
843         struct obd_statfs *osfs;
844         int rc, size = sizeof(*osfs);
845         ENTRY;
846
847         osc_con2cl(conn, &cl, &connection);
848         request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
849         if (!request)
850                 RETURN(-ENOMEM);
851
852         request->rq_replen = lustre_msg_size(1, &size);
853
854         rc = ptlrpc_queue_wait(request);
855         rc = ptlrpc_check_status(request, rc);
856         if (rc)
857                 GOTO(out, rc);
858
859         osfs = lustre_msg_buf(request->rq_repmsg, 0);
860         obd_statfs_unpack(sfs, osfs);
861
862         EXIT;
863  out:
864         ptlrpc_free_req(request);
865         return 0;
866 }
867 #endif
868
869 struct obd_ops osc_obd_ops = {
870         o_setup:   osc_setup,
871         o_cleanup: osc_cleanup,
872         o_create: osc_create,
873         o_destroy: osc_destroy,
874         o_getattr: osc_getattr,
875         o_setattr: osc_setattr,
876         o_open: osc_open,
877         o_close: osc_close,
878         o_connect: osc_connect,
879         o_disconnect: osc_disconnect,
880         o_brw: osc_brw,
881         o_punch: osc_punch,
882         o_enqueue: osc_enqueue,
883         o_cancel: osc_cancel
884 };
885
886 static int __init osc_init(void)
887 {
888         return obd_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
889 }
890
891 static void __exit osc_exit(void)
892 {
893         obd_unregister_type(LUSTRE_OSC_NAME);
894 }
895
896 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
897 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
898 MODULE_LICENSE("GPL");
899
900 module_init(osc_init);
901 module_exit(osc_exit);