Whamcloud - gitweb
WARNING: we currently crash on unmount after the last phase of runtests.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/ctype.h>
28 #include <linux/init.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
31 #include <linux/lustre_lite.h> /* for ll_i2info */
32
33 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
34                        struct lov_stripe_md *md)
35 {
36         struct ptlrpc_request *request;
37         struct ost_body *body;
38         int rc, size = sizeof(*body);
39         ENTRY;
40
41         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1, &size,
42                                   NULL);
43         if (!request)
44                 RETURN(-ENOMEM);
45
46         body = lustre_msg_buf(request->rq_reqmsg, 0);
47 #warning FIXME: pack only valid fields instead of memcpy, endianness
48         memcpy(&body->oa, oa, sizeof(*oa));
49
50         request->rq_replen = lustre_msg_size(1, &size);
51
52         rc = ptlrpc_queue_wait(request);
53         rc = ptlrpc_check_status(request, rc);
54         if (rc) {
55                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
56                 GOTO(out, rc);
57         }
58
59         body = lustre_msg_buf(request->rq_repmsg, 0);
60         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
61         if (oa)
62                 memcpy(oa, &body->oa, sizeof(*oa));
63
64         EXIT;
65  out:
66         ptlrpc_free_req(request);
67         return rc;
68 }
69
70 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
71                     struct lov_stripe_md *md)
72 {
73         struct ptlrpc_request *request;
74         struct ost_body *body;
75         int rc, size = sizeof(*body);
76         ENTRY;
77
78         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
79                                   NULL);
80         if (!request)
81                 RETURN(-ENOMEM);
82
83         body = lustre_msg_buf(request->rq_reqmsg, 0);
84 #warning FIXME: pack only valid fields instead of memcpy, endianness
85         memcpy(&body->oa, oa, sizeof(*oa));
86
87         request->rq_replen = lustre_msg_size(1, &size);
88
89         rc = ptlrpc_queue_wait(request);
90         rc = ptlrpc_check_status(request, rc);
91         if (rc)
92                 GOTO(out, rc);
93
94         body = lustre_msg_buf(request->rq_repmsg, 0);
95         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
96         if (oa)
97                 memcpy(oa, &body->oa, sizeof(*oa));
98
99         EXIT;
100  out:
101         ptlrpc_free_req(request);
102         return rc;
103 }
104
105 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
106                      struct lov_stripe_md *md)
107 {
108         struct ptlrpc_request *request;
109         struct ost_body *body;
110         int rc, size = sizeof(*body);
111         ENTRY;
112
113         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
114                                   NULL);
115         if (!request)
116                 RETURN(-ENOMEM);
117
118         body = lustre_msg_buf(request->rq_reqmsg, 0);
119 #warning FIXME: pack only valid fields instead of memcpy, endianness
120         memcpy(&body->oa, oa, sizeof(*oa));
121
122         request->rq_replen = lustre_msg_size(1, &size);
123
124         rc = ptlrpc_queue_wait(request);
125         rc = ptlrpc_check_status(request, rc);
126         if (rc)
127                 GOTO(out, rc);
128
129         body = lustre_msg_buf(request->rq_repmsg, 0);
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         if (oa)
132                 memcpy(oa, &body->oa, sizeof(*oa));
133
134         EXIT;
135  out:
136         ptlrpc_free_req(request);
137         return rc;
138 }
139
140 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
141                        struct lov_stripe_md *md)
142 {
143         struct ptlrpc_request *request;
144         struct ost_body *body;
145         int rc, size = sizeof(*body);
146         ENTRY;
147
148         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1, &size,
149                                   NULL);
150         if (!request)
151                 RETURN(-ENOMEM);
152
153         body = lustre_msg_buf(request->rq_reqmsg, 0);
154         memcpy(&body->oa, oa, sizeof(*oa));
155
156         request->rq_replen = lustre_msg_size(1, &size);
157
158         rc = ptlrpc_queue_wait(request);
159         rc = ptlrpc_check_status(request, rc);
160         GOTO(out, rc);
161
162  out:
163         ptlrpc_free_req(request);
164         return rc;
165 }
166
167 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
168                       struct lov_stripe_md **ea)
169 {
170         struct ptlrpc_request *request;
171         struct ost_body *body;
172         int rc, size = sizeof(*body);
173         ENTRY;
174
175         if (!oa) {
176                 CERROR("oa NULL\n");
177                 RETURN(-EINVAL);
178         }
179
180         if (!ea) {
181                 LBUG();
182         }
183
184         if (!*ea) {
185                 OBD_ALLOC(*ea, oa->o_easize);
186                 if (!*ea)
187                         RETURN(-ENOMEM);
188                 (*ea)->lmd_easize = oa->o_easize;
189         }
190
191         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
192                                   NULL);
193         if (!request)
194                 RETURN(-ENOMEM);
195
196         body = lustre_msg_buf(request->rq_reqmsg, 0);
197         memcpy(&body->oa, oa, sizeof(*oa));
198
199         request->rq_replen = lustre_msg_size(1, &size);
200
201         rc = ptlrpc_queue_wait(request);
202         rc = ptlrpc_check_status(request, rc);
203         if (rc)
204                 GOTO(out, rc);
205
206         body = lustre_msg_buf(request->rq_repmsg, 0);
207         memcpy(oa, &body->oa, sizeof(*oa));
208
209         (*ea)->lmd_object_id = oa->o_id;
210         (*ea)->lmd_stripe_count = 1;
211         EXIT;
212  out:
213         ptlrpc_free_req(request);
214         return rc;
215 }
216
217 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
218                      struct lov_stripe_md *md, obd_size start,
219                      obd_size end)
220 {
221         struct ptlrpc_request *request;
222         struct ost_body *body;
223         int rc, size = sizeof(*body);
224         ENTRY;
225
226         if (!oa) {
227                 CERROR("oa NULL\n");
228                 RETURN(-EINVAL);
229         }
230
231         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
232                                   NULL);
233         if (!request)
234                 RETURN(-ENOMEM);
235
236         body = lustre_msg_buf(request->rq_reqmsg, 0);
237 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
238         memcpy(&body->oa, oa, sizeof(*oa));
239
240         /* overload the blocks and size fields in the oa with start/end */ 
241 #warning FIXME: endianness, size=start, blocks=end?
242         body->oa.o_blocks = start;
243         body->oa.o_size = end;
244         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
245
246         request->rq_replen = lustre_msg_size(1, &size);
247
248         rc = ptlrpc_queue_wait(request);
249         rc = ptlrpc_check_status(request, rc);
250         if (rc)
251                 GOTO(out, rc);
252
253         body = lustre_msg_buf(request->rq_repmsg, 0);
254         memcpy(oa, &body->oa, sizeof(*oa));
255
256         EXIT;
257  out:
258         ptlrpc_free_req(request);
259         return rc;
260 }
261
262 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
263                        struct lov_stripe_md *ea)
264 {
265         struct ptlrpc_request *request;
266         struct ost_body *body;
267         int rc, size = sizeof(*body);
268         ENTRY;
269
270         if (!oa) {
271                 CERROR("oa NULL\n");
272                 RETURN(-EINVAL);
273         }
274         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1, &size,
275                                   NULL);
276         if (!request)
277                 RETURN(-ENOMEM);
278
279         body = lustre_msg_buf(request->rq_reqmsg, 0);
280 #warning FIXME: pack only valid fields instead of memcpy, endianness
281         memcpy(&body->oa, oa, sizeof(*oa));
282
283         request->rq_replen = lustre_msg_size(1, &size);
284
285         rc = ptlrpc_queue_wait(request);
286         rc = ptlrpc_check_status(request, rc);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = lustre_msg_buf(request->rq_repmsg, 0);
291         memcpy(oa, &body->oa, sizeof(*oa));
292
293         EXIT;
294  out:
295         ptlrpc_free_req(request);
296         return rc;
297 }
298
299 struct osc_brw_cb_data {
300         brw_callback_t callback;
301         void *cb_data;
302         void *obd_data;
303         size_t obd_size;
304 };
305
306 /* Our bulk-unmapping bottom half. */
307 static void unmap_and_decref_bulk_desc(void *data)
308 {
309         struct ptlrpc_bulk_desc *desc = data;
310         struct list_head *tmp;
311         ENTRY;
312
313         /* This feels wrong to me. */
314         list_for_each(tmp, &desc->b_page_list) {
315                 struct ptlrpc_bulk_page *bulk;
316                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
317
318                 kunmap(bulk->b_page);
319         }
320
321         ptlrpc_bulk_decref(desc);
322         EXIT;
323 }
324
325 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
326 {
327         struct osc_brw_cb_data *cb_data = data;
328         int err = 0;
329         ENTRY;
330
331         if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
332                 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS : 
333                        -ETIMEDOUT);
334         }
335
336         if (cb_data->callback)
337                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
338
339         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
340         OBD_FREE(cb_data, sizeof(*cb_data));
341
342         /* We can't kunmap the desc from interrupt context, so we do it from
343          * the bottom half above. */
344         INIT_TQUEUE(&desc->b_queue, 0, 0);
345         PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
346         schedule_task(&desc->b_queue);
347
348         EXIT;
349 }
350
351 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
352                         obd_count page_count, struct brw_page *pga,
353                         brw_callback_t callback, struct io_cb_data *data)
354 {
355         struct ptlrpc_connection *connection =
356                 client_conn2cli(conn)->cl_import.imp_connection;
357         struct ptlrpc_request *request = NULL;
358         struct ptlrpc_bulk_desc *desc = NULL;
359         struct ost_body *body;
360         struct osc_brw_cb_data *cb_data = NULL;
361         int rc, size[3] = {sizeof(*body)};
362         void *iooptr, *nioptr;
363         int mapped = 0;
364         __u32 xid;
365         ENTRY;
366
367         size[1] = sizeof(struct obd_ioobj);
368         size[2] = page_count * sizeof(struct niobuf_remote);
369
370         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
371                                   NULL);
372         if (!request)
373                 RETURN(-ENOMEM);
374
375         body = lustre_msg_buf(request->rq_reqmsg, 0);
376
377         desc = ptlrpc_prep_bulk(connection);
378         if (!desc)
379                 GOTO(out_req, rc = -ENOMEM);
380         desc->b_portal = OST_BULK_PORTAL;
381         desc->b_cb = brw_finish;
382         OBD_ALLOC(cb_data, sizeof(*cb_data));
383         if (!cb_data)
384                 GOTO(out_desc, rc = -ENOMEM);
385
386         cb_data->callback = callback;
387         cb_data->cb_data = data;
388         data->desc = desc;
389         desc->b_cb_data = cb_data;
390
391         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
392         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
393         ost_pack_ioo(&iooptr, md, page_count);
394         /* end almost identical to brw_write case */
395
396         spin_lock(&connection->c_lock);
397         xid = ++connection->c_xid_out;       /* single xid for all pages */
398         spin_unlock(&connection->c_lock);
399
400         for (mapped = 0; mapped < page_count; mapped++) {
401                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
402                 if (bulk == NULL)
403                         GOTO(out_unmap, rc = -ENOMEM);
404
405                 bulk->b_xid = xid;           /* single xid for all pages */
406
407                 bulk->b_buf = kmap(pga[mapped].pg);
408                 bulk->b_page = pga[mapped].pg;
409                 bulk->b_buflen = PAGE_SIZE;
410                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
411                                 pga[mapped].flag, bulk->b_xid);
412         }
413
414         /*
415          * Register the bulk first, because the reply could arrive out of order,
416          * and we want to be ready for the bulk data.
417          *
418          * The reference is released when brw_finish is complete.
419          *
420          * On error, we never do the brw_finish, so we handle all decrefs.
421          */
422         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
423                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
424                        OBD_FAIL_OSC_BRW_READ_BULK);
425         } else {
426                 rc = ptlrpc_register_bulk(desc);
427                 if (rc)
428                         GOTO(out_unmap, rc);
429         }
430
431         request->rq_replen = lustre_msg_size(1, size);
432         rc = ptlrpc_queue_wait(request);
433         rc = ptlrpc_check_status(request, rc);
434
435         /*
436          * XXX: If there is an error during the processing of the callback,
437          *      such as a timeout in a sleep that it performs, brw_finish
438          *      will never get called, and we'll leak the desc, fail to kunmap
439          *      things, cats will live with dogs.  One solution would be to
440          *      export brw_finish as osc_brw_finish, so that the timeout case and
441          *      its kin could call it for proper cleanup.  An alternative would
442          *      be for an error return from the callback to cause us to clean up,
443          *      but that doesn't help the truly async cases (like LOV), which
444          *      will immediately return from their PHASE_START callback, before
445          *      any such cleanup-requiring error condition can be detected.
446          */
447         if (rc)
448                 GOTO(out_req, rc);
449
450         /* Callbacks cause asynchronous handling. */
451         rc = callback(data, 0, CB_PHASE_START); 
452
453 out_req:
454         ptlrpc_req_finished(request);
455         RETURN(rc);
456
457         /* Clean up on error. */
458 out_unmap:
459         while (mapped-- > 0)
460                 kunmap(pga[mapped].pg);
461         OBD_FREE(cb_data, sizeof(*cb_data));
462 out_desc:
463         ptlrpc_bulk_decref(desc);
464         goto out_req;
465 }
466
467 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
468                          obd_count page_count, struct brw_page *pga,
469                          brw_callback_t callback, struct io_cb_data *data)
470 {
471         struct ptlrpc_connection *connection =
472                 client_conn2cli(conn)->cl_import.imp_connection;
473         struct ptlrpc_request *request = NULL;
474         struct ptlrpc_bulk_desc *desc = NULL;
475         struct ost_body *body;
476         struct niobuf_local *local = NULL;
477         struct niobuf_remote *remote;
478         struct osc_brw_cb_data *cb_data = NULL;
479         int rc, j, size[3] = {sizeof(*body)};
480         void *iooptr, *nioptr;
481         int mapped = 0;
482         ENTRY;
483
484         size[1] = sizeof(struct obd_ioobj);
485         size[2] = page_count * sizeof(*remote);
486
487         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
488                                   NULL);
489         if (!request)
490                 RETURN(-ENOMEM);
491
492         body = lustre_msg_buf(request->rq_reqmsg, 0);
493
494         desc = ptlrpc_prep_bulk(connection);
495         if (!desc)
496                 GOTO(out_req, rc = -ENOMEM);
497         desc->b_portal = OSC_BULK_PORTAL;
498         desc->b_cb = brw_finish;
499         OBD_ALLOC(cb_data, sizeof(*cb_data));
500         if (!cb_data)
501                 GOTO(out_desc, rc = -ENOMEM);
502
503         cb_data->callback = callback;
504         cb_data->cb_data = data;
505         data->desc = desc;
506         desc->b_cb_data = cb_data;
507
508         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
509         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
510         ost_pack_ioo(&iooptr, md, page_count);
511         /* end almost identical to brw_read case */
512
513         OBD_ALLOC(local, page_count * sizeof(*local));
514         if (!local)
515                 GOTO(out_cb, rc = -ENOMEM);
516
517         cb_data->obd_data = local;
518         cb_data->obd_size = page_count * sizeof(*local);
519
520         for (mapped = 0; mapped < page_count; mapped++) {
521                 local[mapped].addr = kmap(pga[mapped].pg);
522                 local[mapped].offset = pga[mapped].off;
523                 local[mapped].len = pga[mapped].count;
524                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
525                                 pga[mapped].flag, 0);
526         }
527
528         size[1] = page_count * sizeof(*remote);
529         request->rq_replen = lustre_msg_size(2, size);
530         rc = ptlrpc_queue_wait(request);
531         rc = ptlrpc_check_status(request, rc);
532         if (rc)
533                 GOTO(out_unmap, rc);
534
535         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
536         if (!nioptr)
537                 GOTO(out_unmap, rc = -EINVAL);
538
539         if (request->rq_repmsg->buflens[1] != size[1]) {
540                 CERROR("buffer length wrong (%d vs. %d)\n",
541                        request->rq_repmsg->buflens[1], size[1]);
542                 GOTO(out_unmap, rc = -EINVAL);
543         }
544
545         for (j = 0; j < page_count; j++) {
546                 struct ptlrpc_bulk_page *bulk;
547
548                 ost_unpack_niobuf(&nioptr, &remote);
549
550                 bulk = ptlrpc_prep_bulk_page(desc);
551                 if (!bulk)
552                         GOTO(out_unmap, rc = -ENOMEM);
553
554                 bulk->b_buf = (void *)(unsigned long)local[j].addr;
555                 bulk->b_buflen = local[j].len;
556                 bulk->b_xid = remote->xid;
557                 bulk->b_page = pga[j].pg;
558         }
559
560         if (desc->b_page_count != page_count)
561                 LBUG();
562
563         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
564                 GOTO(out_unmap, rc = 0);
565
566         /* Our reference is released when brw_finish is complete. */
567         rc = ptlrpc_send_bulk(desc);
568
569         /* XXX: Mike, same question as in osc_brw_read. */
570         if (rc)
571                 GOTO(out_req, rc);
572
573         /* Callbacks cause asynchronous handling. */
574         rc = callback(data, 0, CB_PHASE_START);
575
576 out_req:
577         ptlrpc_req_finished(request);
578         RETURN(rc);
579
580         /* Clean up on error. */
581 out_unmap:
582         while (mapped-- > 0)
583                 kunmap(pga[mapped].pg);
584
585         OBD_FREE(local, page_count * sizeof(*local));
586 out_cb:
587         OBD_FREE(cb_data, sizeof(*cb_data));
588 out_desc:
589         ptlrpc_bulk_decref(desc);
590         goto out_req;
591 }
592
593 static int osc_brw(int cmd, struct lustre_handle *conn,
594                    struct lov_stripe_md *md, obd_count page_count,
595                    struct brw_page *pga, brw_callback_t callback,
596                    struct io_cb_data *data)
597 {
598         if (cmd & OBD_BRW_WRITE)
599                 return osc_brw_write(conn, md, page_count, pga, callback, data);
600         else
601                 return osc_brw_read(conn, md, page_count, pga, callback, data);
602 }
603
604 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, 
605                        struct lustre_handle *parent_lock, 
606                        __u32 type, void *extentp, int extent_len, __u32 mode,
607                        int *flags, void *callback, void *data, int datalen,
608                        struct lustre_handle *lockh)
609 {
610         __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
611         struct obd_device *obddev = class_conn2obd(connh);
612         struct ldlm_extent *extent = extentp;
613         int rc;
614         __u32 mode2;
615
616         /* Filesystem locks are given a bit of special treatment: first we
617          * fixup the lock to start and end on page boundaries. */
618         extent->start &= PAGE_MASK;
619         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
620
621         /* Next, search for already existing extent locks that will cover us */
622         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
623         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
624                              sizeof(extent), mode, lockh);
625         if (rc == 1) {
626                 /* We already have a lock, and it's referenced */
627                 return 0;
628         }
629
630         /* Next, search for locks that we can upgrade (if we're trying to write)
631          * or are more than we need (if we're trying to read).  Because the VFS
632          * and page cache already protect us locally, lots of readers/writers
633          * can share a single PW lock. */
634         if (mode == LCK_PW)
635                 mode2 = LCK_PR;
636         else
637                 mode2 = LCK_PW;
638
639         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
640                              sizeof(extent), mode2, lockh);
641         if (rc == 1) {
642                 int flags;
643                 /* FIXME: This is not incredibly elegant, but it might
644                  * be more elegant than adding another parameter to
645                  * lock_match.  I want a second opinion. */
646                 ldlm_lock_addref(lockh, mode);
647                 ldlm_lock_decref(lockh, mode2);
648
649                 if (mode == LCK_PR)
650                         return 0;
651
652                 rc = ldlm_cli_convert(lockh, mode, &flags);
653                 if (rc)
654                         LBUG();
655
656                 return rc;
657         }
658
659         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
660                               parent_lock, res_id, type, extent,
661                               sizeof(extent), mode, flags, ldlm_completion_ast,
662                               callback, data, datalen, lockh);
663         return rc;
664 }
665
666 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
667                       __u32 mode, struct lustre_handle *lockh)
668 {
669         ENTRY;
670
671         ldlm_lock_decref(lockh, mode);
672
673         RETURN(0);
674 }
675
676 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
677 {
678         struct ptlrpc_request *request;
679         struct obd_statfs *osfs;
680         int rc, size = sizeof(*osfs);
681         ENTRY;
682
683         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
684                                   NULL);
685         if (!request)
686                 RETURN(-ENOMEM);
687
688         request->rq_replen = lustre_msg_size(1, &size);
689
690         rc = ptlrpc_queue_wait(request);
691         rc = ptlrpc_check_status(request, rc);
692         if (rc) {
693                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
694                 GOTO(out, rc);
695         }
696
697         osfs = lustre_msg_buf(request->rq_repmsg, 0);
698         obd_statfs_unpack(osfs, sfs);
699
700         EXIT;
701  out:
702         ptlrpc_free_req(request);
703         return rc;
704 }
705
706 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
707                          void *karg, void *uarg)
708 {
709         struct obd_device *obddev = class_conn2obd(conn);
710         struct obd_ioctl_data *data = karg;
711         int err = 0;
712         ENTRY;
713
714         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < 
715                         IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
716                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
717                         _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
718                 RETURN(-EINVAL);
719         }
720
721         switch (cmd) {
722         case IOC_LDLM_TEST: {
723                 err = ldlm_test(obddev, conn);
724                 CERROR("-- done err %d\n", err);
725                 GOTO(out, err);
726         }
727         case IOC_LDLM_REGRESS_START: {
728                 unsigned int numthreads = 1; 
729                 unsigned int numheld = 10; 
730                 unsigned int numres = 10; 
731                 unsigned int numext = 10;
732                 char *parse;
733                 
734                 if (data->ioc_inllen1) {
735                         parse = data->ioc_inlbuf1;
736                         if (*parse != '\0') {
737                                 while(isspace(*parse)) parse++;
738                                 numthreads = simple_strtoul(parse, &parse, 0);
739                                 while(isspace(*parse)) parse++;
740                         }
741                         if (*parse != '\0') {
742                                 while(isspace(*parse)) parse++;
743                                 numheld = simple_strtoul(parse, &parse, 0);
744                                 while(isspace(*parse)) parse++;
745                         }
746                         if (*parse != '\0') {
747                                 while(isspace(*parse)) parse++;
748                                 numres = simple_strtoul(parse, &parse, 0);
749                                 while(isspace(*parse)) parse++;
750                         }
751                         if (*parse != '\0') {
752                                 while(isspace(*parse)) parse++;
753                                 numext = simple_strtoul(parse, &parse, 0);
754                                 while(isspace(*parse)) parse++;
755                         }
756                 }
757
758                 err = ldlm_regression_start(obddev, conn, numthreads, 
759                                 numheld, numres, numext);
760
761                 CERROR("-- done err %d\n", err);
762                 GOTO(out, err);
763         }
764         case IOC_LDLM_REGRESS_STOP: {
765                 err = ldlm_regression_stop();
766                 CERROR("-- done err %d\n", err);
767                 GOTO(out, err);
768         }
769         default:
770                 GOTO(out, err = -EINVAL);
771         }
772 out:
773         return err;
774 }
775
776 struct obd_ops osc_obd_ops = {
777         o_setup:        client_obd_setup,
778         o_cleanup:      client_obd_cleanup,
779         o_statfs:       osc_statfs,
780         o_create:       osc_create,
781         o_destroy:      osc_destroy,
782         o_getattr:      osc_getattr,
783         o_setattr:      osc_setattr,
784         o_open:         osc_open,
785         o_close:        osc_close,
786         o_connect:      client_obd_connect,
787         o_disconnect:   client_obd_disconnect,
788         o_brw:          osc_brw,
789         o_punch:        osc_punch,
790         o_enqueue:      osc_enqueue,
791         o_cancel:       osc_cancel,
792         o_iocontrol:    osc_iocontrol
793 };
794
795 static int __init osc_init(void)
796 {
797         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
798 }
799
800 static void __exit osc_exit(void)
801 {
802         class_unregister_type(LUSTRE_OSC_NAME);
803 }
804
805 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
806 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
807 MODULE_LICENSE("GPL");
808
809 module_init(osc_init);
810 module_exit(osc_exit);