Whamcloud - gitweb
- change field names for clarity.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/ctype.h>
28 #include <linux/init.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
31 #include <linux/lustre_lite.h> /* for ll_i2info */
32
33 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
34                        struct lov_stripe_md *md)
35 {
36         struct ptlrpc_request *request;
37         struct ost_body *body;
38         int rc, size = sizeof(*body);
39         ENTRY;
40
41         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
42                                   &size, NULL);
43         if (!request)
44                 RETURN(-ENOMEM);
45
46         body = lustre_msg_buf(request->rq_reqmsg, 0);
47 #warning FIXME: pack only valid fields instead of memcpy, endianness
48         memcpy(&body->oa, oa, sizeof(*oa));
49
50         request->rq_replen = lustre_msg_size(1, &size);
51
52         rc = ptlrpc_queue_wait(request);
53         rc = ptlrpc_check_status(request, rc);
54         if (rc) {
55                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
56                 GOTO(out, rc);
57         }
58
59         body = lustre_msg_buf(request->rq_repmsg, 0);
60         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
61         if (oa)
62                 memcpy(oa, &body->oa, sizeof(*oa));
63
64         EXIT;
65  out:
66         ptlrpc_free_req(request);
67         return rc;
68 }
69
70 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
71                     struct lov_stripe_md *md)
72 {
73         struct ptlrpc_request *request;
74         struct ost_body *body;
75         int rc, size = sizeof(*body);
76         ENTRY;
77
78         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
79                                   NULL);
80         if (!request)
81                 RETURN(-ENOMEM);
82
83         body = lustre_msg_buf(request->rq_reqmsg, 0);
84 #warning FIXME: pack only valid fields instead of memcpy, endianness
85         memcpy(&body->oa, oa, sizeof(*oa));
86
87         request->rq_replen = lustre_msg_size(1, &size);
88
89         rc = ptlrpc_queue_wait(request);
90         rc = ptlrpc_check_status(request, rc);
91         if (rc)
92                 GOTO(out, rc);
93
94         body = lustre_msg_buf(request->rq_repmsg, 0);
95         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
96         if (oa)
97                 memcpy(oa, &body->oa, sizeof(*oa));
98
99         EXIT;
100  out:
101         ptlrpc_free_req(request);
102         return rc;
103 }
104
105 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
106                      struct lov_stripe_md *md)
107 {
108         struct ptlrpc_request *request;
109         struct ost_body *body;
110         int rc, size = sizeof(*body);
111         ENTRY;
112
113         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
114                                   NULL);
115         if (!request)
116                 RETURN(-ENOMEM);
117
118         body = lustre_msg_buf(request->rq_reqmsg, 0);
119 #warning FIXME: pack only valid fields instead of memcpy, endianness
120         memcpy(&body->oa, oa, sizeof(*oa));
121
122         request->rq_replen = lustre_msg_size(1, &size);
123
124         rc = ptlrpc_queue_wait(request);
125         rc = ptlrpc_check_status(request, rc);
126         if (rc)
127                 GOTO(out, rc);
128
129         body = lustre_msg_buf(request->rq_repmsg, 0);
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         if (oa)
132                 memcpy(oa, &body->oa, sizeof(*oa));
133
134         EXIT;
135  out:
136         ptlrpc_free_req(request);
137         return rc;
138 }
139
140 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
141                        struct lov_stripe_md *md)
142 {
143         struct ptlrpc_request *request;
144         struct ost_body *body;
145         int rc, size = sizeof(*body);
146         ENTRY;
147
148         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
149                                   &size, NULL);
150         if (!request)
151                 RETURN(-ENOMEM);
152
153         body = lustre_msg_buf(request->rq_reqmsg, 0);
154         memcpy(&body->oa, oa, sizeof(*oa));
155
156         request->rq_replen = lustre_msg_size(1, &size);
157
158         rc = ptlrpc_queue_wait(request);
159         rc = ptlrpc_check_status(request, rc);
160
161         ptlrpc_free_req(request);
162         return rc;
163 }
164
165 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
166                       struct lov_stripe_md **ea)
167 {
168         struct ptlrpc_request *request;
169         struct ost_body *body;
170         int rc, size = sizeof(*body);
171         ENTRY;
172
173         if (!oa) {
174                 CERROR("oa NULL\n");
175                 RETURN(-EINVAL);
176         }
177
178         if (!ea) {
179                 LBUG();
180         }
181
182         if (!*ea) {
183                 OBD_ALLOC(*ea, oa->o_easize);
184                 if (!*ea)
185                         RETURN(-ENOMEM);
186                 (*ea)->lmd_mds_easize = oa->o_easize;
187         }
188
189         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
190                                   NULL);
191         if (!request)
192                 RETURN(-ENOMEM);
193
194         body = lustre_msg_buf(request->rq_reqmsg, 0);
195         memcpy(&body->oa, oa, sizeof(*oa));
196
197         request->rq_replen = lustre_msg_size(1, &size);
198
199         rc = ptlrpc_queue_wait(request);
200         rc = ptlrpc_check_status(request, rc);
201         if (rc)
202                 GOTO(out, rc);
203
204         body = lustre_msg_buf(request->rq_repmsg, 0);
205         memcpy(oa, &body->oa, sizeof(*oa));
206
207         (*ea)->lmd_object_id = oa->o_id;
208         (*ea)->lmd_stripe_count = 1;
209         EXIT;
210  out:
211         ptlrpc_free_req(request);
212         return rc;
213 }
214
215 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
216                      struct lov_stripe_md *md, obd_size start,
217                      obd_size end)
218 {
219         struct ptlrpc_request *request;
220         struct ost_body *body;
221         int rc, size = sizeof(*body);
222         ENTRY;
223
224         if (!oa) {
225                 CERROR("oa NULL\n");
226                 RETURN(-EINVAL);
227         }
228
229         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
230                                   NULL);
231         if (!request)
232                 RETURN(-ENOMEM);
233
234         body = lustre_msg_buf(request->rq_reqmsg, 0);
235 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
236         memcpy(&body->oa, oa, sizeof(*oa));
237
238         /* overload the blocks and size fields in the oa with start/end */
239 #warning FIXME: endianness, size=start, blocks=end?
240         body->oa.o_blocks = start;
241         body->oa.o_size = end;
242         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
243
244         request->rq_replen = lustre_msg_size(1, &size);
245
246         rc = ptlrpc_queue_wait(request);
247         rc = ptlrpc_check_status(request, rc);
248         if (rc)
249                 GOTO(out, rc);
250
251         body = lustre_msg_buf(request->rq_repmsg, 0);
252         memcpy(oa, &body->oa, sizeof(*oa));
253
254         EXIT;
255  out:
256         ptlrpc_free_req(request);
257         return rc;
258 }
259
260 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
261                        struct lov_stripe_md *ea)
262 {
263         struct ptlrpc_request *request;
264         struct ost_body *body;
265         int rc, size = sizeof(*body);
266         ENTRY;
267
268         if (!oa) {
269                 CERROR("oa NULL\n");
270                 RETURN(-EINVAL);
271         }
272         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
273                                   &size, NULL);
274         if (!request)
275                 RETURN(-ENOMEM);
276
277         body = lustre_msg_buf(request->rq_reqmsg, 0);
278 #warning FIXME: pack only valid fields instead of memcpy, endianness
279         memcpy(&body->oa, oa, sizeof(*oa));
280
281         request->rq_replen = lustre_msg_size(1, &size);
282
283         rc = ptlrpc_queue_wait(request);
284         rc = ptlrpc_check_status(request, rc);
285         if (rc)
286                 GOTO(out, rc);
287
288         body = lustre_msg_buf(request->rq_repmsg, 0);
289         memcpy(oa, &body->oa, sizeof(*oa));
290
291         EXIT;
292  out:
293         ptlrpc_free_req(request);
294         return rc;
295 }
296
297 struct osc_brw_cb_data {
298         brw_callback_t callback;
299         void *cb_data;
300         void *obd_data;
301         size_t obd_size;
302 };
303
304 /* Our bulk-unmapping bottom half. */
305 static void unmap_and_decref_bulk_desc(void *data)
306 {
307         struct ptlrpc_bulk_desc *desc = data;
308         struct list_head *tmp;
309         ENTRY;
310
311         /* This feels wrong to me. */
312         list_for_each(tmp, &desc->bd_page_list) {
313                 struct ptlrpc_bulk_page *bulk;
314                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
315
316                 kunmap(bulk->bp_page);
317         }
318
319         ptlrpc_bulk_decref(desc);
320         EXIT;
321 }
322
323 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
324 {
325         struct osc_brw_cb_data *cb_data = data;
326         int err = 0;
327         ENTRY;
328
329         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
330                 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
331                        -ETIMEDOUT);
332         }
333
334         if (cb_data->callback)
335                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
336
337         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
338         OBD_FREE(cb_data, sizeof(*cb_data));
339
340         /* We can't kunmap the desc from interrupt context, so we do it from
341          * the bottom half above. */
342         INIT_TQUEUE(&desc->bd_queue, 0, 0);
343         PREPARE_TQUEUE(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
344         schedule_task(&desc->bd_queue);
345
346         EXIT;
347 }
348
349 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
350                         obd_count page_count, struct brw_page *pga,
351                         brw_callback_t callback, struct io_cb_data *data)
352 {
353         struct ptlrpc_connection *connection =
354                 client_conn2cli(conn)->cl_import.imp_connection;
355         struct ptlrpc_request *request = NULL;
356         struct ptlrpc_bulk_desc *desc = NULL;
357         struct ost_body *body;
358         struct osc_brw_cb_data *cb_data = NULL;
359         int rc, size[3] = {sizeof(*body)};
360         void *iooptr, *nioptr;
361         int mapped = 0;
362         __u32 xid;
363         ENTRY;
364
365         size[1] = sizeof(struct obd_ioobj);
366         size[2] = page_count * sizeof(struct niobuf_remote);
367
368         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
369                                   NULL);
370         if (!request)
371                 RETURN(-ENOMEM);
372
373         body = lustre_msg_buf(request->rq_reqmsg, 0);
374
375         desc = ptlrpc_prep_bulk(connection);
376         if (!desc)
377                 GOTO(out_req, rc = -ENOMEM);
378         desc->bd_portal = OST_BULK_PORTAL;
379         desc->bd_cb = brw_finish;
380         OBD_ALLOC(cb_data, sizeof(*cb_data));
381         if (!cb_data)
382                 GOTO(out_desc, rc = -ENOMEM);
383
384         cb_data->callback = callback;
385         cb_data->cb_data = data;
386         data->desc = desc;
387         desc->bd_cb_data = cb_data;
388
389         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
390         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
391         ost_pack_ioo(&iooptr, md, page_count);
392         /* end almost identical to brw_write case */
393
394         spin_lock(&connection->c_lock);
395         xid = ++connection->c_xid_out;       /* single xid for all pages */
396         spin_unlock(&connection->c_lock);
397
398         for (mapped = 0; mapped < page_count; mapped++) {
399                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
400                 if (bulk == NULL)
401                         GOTO(out_unmap, rc = -ENOMEM);
402
403                 bulk->bp_xid = xid;           /* single xid for all pages */
404
405                 bulk->bp_buf = kmap(pga[mapped].pg);
406                 bulk->bp_page = pga[mapped].pg;
407                 bulk->bp_buflen = PAGE_SIZE;
408                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
409                                 pga[mapped].flag, bulk->bp_xid);
410         }
411
412         /*
413          * Register the bulk first, because the reply could arrive out of order,
414          * and we want to be ready for the bulk data.
415          *
416          * The reference is released when brw_finish is complete.
417          *
418          * On error, we never do the brw_finish, so we handle all decrefs.
419          */
420         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
421                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
422                        OBD_FAIL_OSC_BRW_READ_BULK);
423         } else {
424                 rc = ptlrpc_register_bulk(desc);
425                 if (rc)
426                         GOTO(out_unmap, rc);
427         }
428
429         request->rq_replen = lustre_msg_size(1, size);
430         rc = ptlrpc_queue_wait(request);
431         rc = ptlrpc_check_status(request, rc);
432
433         /*
434          * XXX: If there is an error during the processing of the callback,
435          *      such as a timeout in a sleep that it performs, brw_finish
436          *      will never get called, and we'll leak the desc, fail to kunmap
437          *      things, cats will live with dogs.  One solution would be to
438          *      export brw_finish as osc_brw_finish, so that the timeout case
439          *      and its kin could call it for proper cleanup.  An alternative
440          *      would be for an error return from the callback to cause us to
441          *      clean up, but that doesn't help the truly async cases (like
442          *      LOV), which will immediately return from their PHASE_START
443          *      callback, before any such cleanup-requiring error condition can
444          *      be detected.
445          */
446         if (rc)
447                 GOTO(out_req, rc);
448
449         /* Callbacks cause asynchronous handling. */
450         rc = callback(data, 0, CB_PHASE_START);
451
452 out_req:
453         ptlrpc_req_finished(request);
454         RETURN(rc);
455
456         /* Clean up on error. */
457 out_unmap:
458         while (mapped-- > 0)
459                 kunmap(pga[mapped].pg);
460         OBD_FREE(cb_data, sizeof(*cb_data));
461 out_desc:
462         ptlrpc_bulk_decref(desc);
463         goto out_req;
464 }
465
466 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
467                          obd_count page_count, struct brw_page *pga,
468                          brw_callback_t callback, struct io_cb_data *data)
469 {
470         struct ptlrpc_connection *connection =
471                 client_conn2cli(conn)->cl_import.imp_connection;
472         struct ptlrpc_request *request = NULL;
473         struct ptlrpc_bulk_desc *desc = NULL;
474         struct ost_body *body;
475         struct niobuf_local *local = NULL;
476         struct niobuf_remote *remote;
477         struct osc_brw_cb_data *cb_data = NULL;
478         int rc, j, size[3] = {sizeof(*body)};
479         void *iooptr, *nioptr;
480         int mapped = 0;
481         ENTRY;
482
483         size[1] = sizeof(struct obd_ioobj);
484         size[2] = page_count * sizeof(*remote);
485
486         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
487                                   NULL);
488         if (!request)
489                 RETURN(-ENOMEM);
490
491         body = lustre_msg_buf(request->rq_reqmsg, 0);
492
493         desc = ptlrpc_prep_bulk(connection);
494         if (!desc)
495                 GOTO(out_req, rc = -ENOMEM);
496         desc->bd_portal = OSC_BULK_PORTAL;
497         desc->bd_cb = brw_finish;
498         OBD_ALLOC(cb_data, sizeof(*cb_data));
499         if (!cb_data)
500                 GOTO(out_desc, rc = -ENOMEM);
501
502         cb_data->callback = callback;
503         cb_data->cb_data = data;
504         data->desc = desc;
505         desc->bd_cb_data = cb_data;
506
507         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
508         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
509         ost_pack_ioo(&iooptr, md, page_count);
510         /* end almost identical to brw_read case */
511
512         OBD_ALLOC(local, page_count * sizeof(*local));
513         if (!local)
514                 GOTO(out_cb, rc = -ENOMEM);
515
516         cb_data->obd_data = local;
517         cb_data->obd_size = page_count * sizeof(*local);
518
519         for (mapped = 0; mapped < page_count; mapped++) {
520                 local[mapped].addr = kmap(pga[mapped].pg);
521
522                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
523                        "%d ; page %d of %d\n",
524                        local[mapped].addr, pga[mapped].pg->flags,
525                        page_count(pga[mapped].pg), 
526                        mapped, page_count - 1);
527
528                 local[mapped].offset = pga[mapped].off;
529                 local[mapped].len = pga[mapped].count;
530                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
531                                 pga[mapped].flag, 0);
532         }
533
534         size[1] = page_count * sizeof(*remote);
535         request->rq_replen = lustre_msg_size(2, size);
536         rc = ptlrpc_queue_wait(request);
537         rc = ptlrpc_check_status(request, rc);
538         if (rc)
539                 GOTO(out_unmap, rc);
540
541         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
542         if (!nioptr)
543                 GOTO(out_unmap, rc = -EINVAL);
544
545         if (request->rq_repmsg->buflens[1] != size[1]) {
546                 CERROR("buffer length wrong (%d vs. %d)\n",
547                        request->rq_repmsg->buflens[1], size[1]);
548                 GOTO(out_unmap, rc = -EINVAL);
549         }
550
551         for (j = 0; j < page_count; j++) {
552                 struct ptlrpc_bulk_page *bulk;
553
554                 ost_unpack_niobuf(&nioptr, &remote);
555
556                 bulk = ptlrpc_prep_bulk_page(desc);
557                 if (!bulk)
558                         GOTO(out_unmap, rc = -ENOMEM);
559
560                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
561                 bulk->bp_buflen = local[j].len;
562                 bulk->bp_xid = remote->xid;
563                 bulk->bp_page = pga[j].pg;
564         }
565
566         if (desc->bd_page_count != page_count)
567                 LBUG();
568
569         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
570                 GOTO(out_unmap, rc = 0);
571
572         /* Our reference is released when brw_finish is complete. */
573         rc = ptlrpc_send_bulk(desc);
574
575         /* XXX: Mike, same question as in osc_brw_read. */
576         if (rc)
577                 GOTO(out_req, rc);
578
579         /* Callbacks cause asynchronous handling. */
580         rc = callback(data, 0, CB_PHASE_START);
581
582 out_req:
583         ptlrpc_req_finished(request);
584         RETURN(rc);
585
586         /* Clean up on error. */
587 out_unmap:
588         while (mapped-- > 0)
589                 kunmap(pga[mapped].pg);
590
591         OBD_FREE(local, page_count * sizeof(*local));
592 out_cb:
593         OBD_FREE(cb_data, sizeof(*cb_data));
594 out_desc:
595         ptlrpc_bulk_decref(desc);
596         goto out_req;
597 }
598
599 static int osc_brw(int cmd, struct lustre_handle *conn,
600                    struct lov_stripe_md *md, obd_count page_count,
601                    struct brw_page *pga, brw_callback_t callback,
602                    struct io_cb_data *data)
603 {
604         if (cmd & OBD_BRW_WRITE)
605                 return osc_brw_write(conn, md, page_count, pga, callback, data);
606         else
607                 return osc_brw_read(conn, md, page_count, pga, callback, data);
608 }
609
610 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md,
611                        struct lustre_handle *parent_lock,
612                        __u32 type, void *extentp, int extent_len, __u32 mode,
613                        int *flags, void *callback, void *data, int datalen,
614                        struct lustre_handle *lockh)
615 {
616         __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
617         struct obd_device *obddev = class_conn2obd(connh);
618         struct ldlm_extent *extent = extentp;
619         int rc;
620         __u32 mode2;
621
622         /* Filesystem locks are given a bit of special treatment: first we
623          * fixup the lock to start and end on page boundaries. */
624         extent->start &= PAGE_MASK;
625         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
626
627         /* Next, search for already existing extent locks that will cover us */
628         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
629         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
630                              sizeof(extent), mode, lockh);
631         if (rc == 1) {
632                 /* We already have a lock, and it's referenced */
633                 return 0;
634         }
635
636         /* Next, search for locks that we can upgrade (if we're trying to write)
637          * or are more than we need (if we're trying to read).  Because the VFS
638          * and page cache already protect us locally, lots of readers/writers
639          * can share a single PW lock. */
640         if (mode == LCK_PW)
641                 mode2 = LCK_PR;
642         else
643                 mode2 = LCK_PW;
644
645         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
646                              sizeof(extent), mode2, lockh);
647         if (rc == 1) {
648                 int flags;
649                 /* FIXME: This is not incredibly elegant, but it might
650                  * be more elegant than adding another parameter to
651                  * lock_match.  I want a second opinion. */
652                 ldlm_lock_addref(lockh, mode);
653                 ldlm_lock_decref(lockh, mode2);
654
655                 if (mode == LCK_PR)
656                         return 0;
657
658                 rc = ldlm_cli_convert(lockh, mode, &flags);
659                 if (rc)
660                         LBUG();
661
662                 return rc;
663         }
664
665         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
666                               parent_lock, res_id, type, extent,
667                               sizeof(extent), mode, flags, ldlm_completion_ast,
668                               callback, data, datalen, lockh);
669         return rc;
670 }
671
672 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
673                       __u32 mode, struct lustre_handle *lockh)
674 {
675         ENTRY;
676
677         ldlm_lock_decref(lockh, mode);
678
679         RETURN(0);
680 }
681
682 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
683 {
684         struct ptlrpc_request *request;
685         int rc, size = sizeof(*osfs);
686         ENTRY;
687
688         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
689                                   NULL);
690         if (!request)
691                 RETURN(-ENOMEM);
692
693         request->rq_replen = lustre_msg_size(1, &size);
694
695         rc = ptlrpc_queue_wait(request);
696         rc = ptlrpc_check_status(request, rc);
697         if (rc) {
698                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
699                 GOTO(out, rc);
700         }
701
702         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
703
704         EXIT;
705  out:
706         ptlrpc_free_req(request);
707         return rc;
708 }
709
710 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
711                          void *karg, void *uarg)
712 {
713         struct obd_device *obddev = class_conn2obd(conn);
714         struct obd_ioctl_data *data = karg;
715         int err = 0;
716         ENTRY;
717
718         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
719             _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
720                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
721                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
722                 RETURN(-EINVAL);
723         }
724
725         switch (cmd) {
726         case IOC_LDLM_TEST: {
727                 err = ldlm_test(obddev, conn);
728                 CERROR("-- done err %d\n", err);
729                 GOTO(out, err);
730         }
731         case IOC_LDLM_REGRESS_START: {
732                 unsigned int numthreads = 1;
733                 unsigned int numheld = 10;
734                 unsigned int numres = 10;
735                 unsigned int numext = 10;
736                 char *parse;
737
738                 if (data->ioc_inllen1) {
739                         parse = data->ioc_inlbuf1;
740                         if (*parse != '\0') {
741                                 while(isspace(*parse)) parse++;
742                                 numthreads = simple_strtoul(parse, &parse, 0);
743                                 while(isspace(*parse)) parse++;
744                         }
745                         if (*parse != '\0') {
746                                 while(isspace(*parse)) parse++;
747                                 numheld = simple_strtoul(parse, &parse, 0);
748                                 while(isspace(*parse)) parse++;
749                         }
750                         if (*parse != '\0') {
751                                 while(isspace(*parse)) parse++;
752                                 numres = simple_strtoul(parse, &parse, 0);
753                                 while(isspace(*parse)) parse++;
754                         }
755                         if (*parse != '\0') {
756                                 while(isspace(*parse)) parse++;
757                                 numext = simple_strtoul(parse, &parse, 0);
758                                 while(isspace(*parse)) parse++;
759                         }
760                 }
761
762                 err = ldlm_regression_start(obddev, conn, numthreads,
763                                 numheld, numres, numext);
764
765                 CERROR("-- done err %d\n", err);
766                 GOTO(out, err);
767         }
768         case IOC_LDLM_REGRESS_STOP: {
769                 err = ldlm_regression_stop();
770                 CERROR("-- done err %d\n", err);
771                 GOTO(out, err);
772         }
773         default:
774                 GOTO(out, err = -EINVAL);
775         }
776 out:
777         return err;
778 }
779
780 struct obd_ops osc_obd_ops = {
781         o_setup:        client_obd_setup,
782         o_cleanup:      client_obd_cleanup,
783         o_statfs:       osc_statfs,
784         o_create:       osc_create,
785         o_destroy:      osc_destroy,
786         o_getattr:      osc_getattr,
787         o_setattr:      osc_setattr,
788         o_open:         osc_open,
789         o_close:        osc_close,
790         o_connect:      client_obd_connect,
791         o_disconnect:   client_obd_disconnect,
792         o_brw:          osc_brw,
793         o_punch:        osc_punch,
794         o_enqueue:      osc_enqueue,
795         o_cancel:       osc_cancel,
796         o_iocontrol:    osc_iocontrol
797 };
798
799 static int __init osc_init(void)
800 {
801         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
802 }
803
804 static void __exit osc_exit(void)
805 {
806         class_unregister_type(LUSTRE_OSC_NAME);
807 }
808
809 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
810 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
811 MODULE_LICENSE("GPL");
812
813 module_init(osc_init);
814 module_exit(osc_exit);