Whamcloud - gitweb
- Adds an int 'local_only' to the cancel_unused function and methods.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/ctype.h>
28 #include <linux/init.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
31 #include <linux/lustre_lite.h> /* for ll_i2info */
32
33 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
34                        struct lov_stripe_md *md)
35 {
36         struct ptlrpc_request *request;
37         struct ost_body *body;
38         int rc, size = sizeof(*body);
39         ENTRY;
40
41         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
42                                   &size, NULL);
43         if (!request)
44                 RETURN(-ENOMEM);
45
46         body = lustre_msg_buf(request->rq_reqmsg, 0);
47 #warning FIXME: pack only valid fields instead of memcpy, endianness
48         memcpy(&body->oa, oa, sizeof(*oa));
49
50         request->rq_replen = lustre_msg_size(1, &size);
51
52         rc = ptlrpc_queue_wait(request);
53         rc = ptlrpc_check_status(request, rc);
54         if (rc) {
55                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
56                 GOTO(out, rc);
57         }
58
59         body = lustre_msg_buf(request->rq_repmsg, 0);
60         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
61         if (oa)
62                 memcpy(oa, &body->oa, sizeof(*oa));
63
64         EXIT;
65  out:
66         ptlrpc_free_req(request);
67         return rc;
68 }
69
70 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
71                     struct lov_stripe_md *md)
72 {
73         struct ptlrpc_request *request;
74         struct ost_body *body;
75         int rc, size = sizeof(*body);
76         ENTRY;
77
78         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
79                                   NULL);
80         if (!request)
81                 RETURN(-ENOMEM);
82
83         body = lustre_msg_buf(request->rq_reqmsg, 0);
84 #warning FIXME: pack only valid fields instead of memcpy, endianness
85         memcpy(&body->oa, oa, sizeof(*oa));
86
87         request->rq_replen = lustre_msg_size(1, &size);
88
89         rc = ptlrpc_queue_wait(request);
90         rc = ptlrpc_check_status(request, rc);
91         if (rc)
92                 GOTO(out, rc);
93
94         body = lustre_msg_buf(request->rq_repmsg, 0);
95         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
96         if (oa)
97                 memcpy(oa, &body->oa, sizeof(*oa));
98
99         EXIT;
100  out:
101         ptlrpc_free_req(request);
102         return rc;
103 }
104
105 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
106                      struct lov_stripe_md *md)
107 {
108         struct ptlrpc_request *request;
109         struct ost_body *body;
110         int rc, size = sizeof(*body);
111         ENTRY;
112
113         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
114                                   NULL);
115         if (!request)
116                 RETURN(-ENOMEM);
117
118         body = lustre_msg_buf(request->rq_reqmsg, 0);
119 #warning FIXME: pack only valid fields instead of memcpy, endianness
120         memcpy(&body->oa, oa, sizeof(*oa));
121
122         request->rq_replen = lustre_msg_size(1, &size);
123
124         rc = ptlrpc_queue_wait(request);
125         rc = ptlrpc_check_status(request, rc);
126         if (rc)
127                 GOTO(out, rc);
128
129         body = lustre_msg_buf(request->rq_repmsg, 0);
130         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131         if (oa)
132                 memcpy(oa, &body->oa, sizeof(*oa));
133
134         EXIT;
135  out:
136         ptlrpc_free_req(request);
137         return rc;
138 }
139
140 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
141                        struct lov_stripe_md *md)
142 {
143         struct ptlrpc_request *request;
144         struct ost_body *body;
145         int rc, size = sizeof(*body);
146         ENTRY;
147
148         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
149                                   &size, NULL);
150         if (!request)
151                 RETURN(-ENOMEM);
152
153         body = lustre_msg_buf(request->rq_reqmsg, 0);
154         memcpy(&body->oa, oa, sizeof(*oa));
155
156         request->rq_replen = lustre_msg_size(1, &size);
157
158         rc = ptlrpc_queue_wait(request);
159         rc = ptlrpc_check_status(request, rc);
160
161         ptlrpc_free_req(request);
162         return rc;
163 }
164
165 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
166                       struct lov_stripe_md **ea)
167 {
168         struct ptlrpc_request *request;
169         struct ost_body *body;
170         int rc, size = sizeof(*body);
171         ENTRY;
172
173         if (!oa) {
174                 CERROR("oa NULL\n");
175                 RETURN(-EINVAL);
176         }
177
178         if (!ea) {
179                 LBUG();
180         }
181
182         if (!*ea) {
183                 // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
184                 OBD_ALLOC(*ea, oa->o_easize);
185                 if (!*ea)
186                         RETURN(-ENOMEM);
187                 (*ea)->lsm_mds_easize = oa->o_easize;
188         }
189
190         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
191                                   NULL);
192         if (!request)
193                 RETURN(-ENOMEM);
194
195         body = lustre_msg_buf(request->rq_reqmsg, 0);
196         memcpy(&body->oa, oa, sizeof(*oa));
197
198         request->rq_replen = lustre_msg_size(1, &size);
199
200         rc = ptlrpc_queue_wait(request);
201         rc = ptlrpc_check_status(request, rc);
202         if (rc)
203                 GOTO(out, rc);
204
205         body = lustre_msg_buf(request->rq_repmsg, 0);
206         memcpy(oa, &body->oa, sizeof(*oa));
207
208         (*ea)->lsm_object_id = oa->o_id;
209         (*ea)->lsm_stripe_count = 0;
210         EXIT;
211  out:
212         ptlrpc_free_req(request);
213         return rc;
214 }
215
216 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
217                      struct lov_stripe_md *md, obd_size start,
218                      obd_size end)
219 {
220         struct ptlrpc_request *request;
221         struct ost_body *body;
222         int rc, size = sizeof(*body);
223         ENTRY;
224
225         if (!oa) {
226                 CERROR("oa NULL\n");
227                 RETURN(-EINVAL);
228         }
229
230         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
231                                   NULL);
232         if (!request)
233                 RETURN(-ENOMEM);
234
235         body = lustre_msg_buf(request->rq_reqmsg, 0);
236 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
237         memcpy(&body->oa, oa, sizeof(*oa));
238
239         /* overload the blocks and size fields in the oa with start/end */
240 #warning FIXME: endianness, size=start, blocks=end?
241         body->oa.o_blocks = start;
242         body->oa.o_size = end;
243         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
244
245         request->rq_replen = lustre_msg_size(1, &size);
246
247         rc = ptlrpc_queue_wait(request);
248         rc = ptlrpc_check_status(request, rc);
249         if (rc)
250                 GOTO(out, rc);
251
252         body = lustre_msg_buf(request->rq_repmsg, 0);
253         memcpy(oa, &body->oa, sizeof(*oa));
254
255         EXIT;
256  out:
257         ptlrpc_free_req(request);
258         return rc;
259 }
260
261 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
262                        struct lov_stripe_md *ea)
263 {
264         struct ptlrpc_request *request;
265         struct ost_body *body;
266         int rc, size = sizeof(*body);
267         ENTRY;
268
269         if (!oa) {
270                 CERROR("oa NULL\n");
271                 RETURN(-EINVAL);
272         }
273         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
274                                   &size, NULL);
275         if (!request)
276                 RETURN(-ENOMEM);
277
278         body = lustre_msg_buf(request->rq_reqmsg, 0);
279 #warning FIXME: pack only valid fields instead of memcpy, endianness
280         memcpy(&body->oa, oa, sizeof(*oa));
281
282         request->rq_replen = lustre_msg_size(1, &size);
283
284         rc = ptlrpc_queue_wait(request);
285         rc = ptlrpc_check_status(request, rc);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = lustre_msg_buf(request->rq_repmsg, 0);
290         memcpy(oa, &body->oa, sizeof(*oa));
291
292         EXIT;
293  out:
294         ptlrpc_free_req(request);
295         return rc;
296 }
297
298 struct osc_brw_cb_data {
299         brw_callback_t callback;
300         void *cb_data;
301         void *obd_data;
302         size_t obd_size;
303 };
304
305 /* Our bulk-unmapping bottom half. */
306 static void unmap_and_decref_bulk_desc(void *data)
307 {
308         struct ptlrpc_bulk_desc *desc = data;
309         struct list_head *tmp;
310         ENTRY;
311
312         /* This feels wrong to me. */
313         list_for_each(tmp, &desc->bd_page_list) {
314                 struct ptlrpc_bulk_page *bulk;
315                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
316
317                 kunmap(bulk->bp_page);
318         }
319
320         ptlrpc_bulk_decref(desc);
321         EXIT;
322 }
323
324 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
325 {
326         struct osc_brw_cb_data *cb_data = data;
327         int err = 0;
328         ENTRY;
329
330         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
331                 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
332                        -ETIMEDOUT);
333         }
334
335         if (cb_data->callback)
336                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
337
338         if (cb_data->obd_data)
339                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
340         OBD_FREE(cb_data, sizeof(*cb_data));
341
342         /* We can't kunmap the desc from interrupt context, so we do it from
343          * the bottom half above. */
344         INIT_TQUEUE(&desc->bd_queue, 0, 0);
345         PREPARE_TQUEUE(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
346         schedule_task(&desc->bd_queue);
347
348         EXIT;
349 }
350
351 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
352                         obd_count page_count, struct brw_page *pga,
353                         brw_callback_t callback, struct io_cb_data *data)
354 {
355         struct ptlrpc_connection *connection =
356                 client_conn2cli(conn)->cl_import.imp_connection;
357         struct ptlrpc_request *request = NULL;
358         struct ptlrpc_bulk_desc *desc = NULL;
359         struct ost_body *body;
360         struct osc_brw_cb_data *cb_data = NULL;
361         int rc, size[3] = {sizeof(*body)};
362         void *iooptr, *nioptr;
363         int mapped = 0;
364         __u32 xid;
365         ENTRY;
366
367         size[1] = sizeof(struct obd_ioobj);
368         size[2] = page_count * sizeof(struct niobuf_remote);
369
370         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
371                                   NULL);
372         if (!request)
373                 RETURN(-ENOMEM);
374
375         body = lustre_msg_buf(request->rq_reqmsg, 0);
376
377         desc = ptlrpc_prep_bulk(connection);
378         if (!desc)
379                 GOTO(out_req, rc = -ENOMEM);
380         desc->bd_portal = OST_BULK_PORTAL;
381         desc->bd_cb = brw_finish;
382         OBD_ALLOC(cb_data, sizeof(*cb_data));
383         if (!cb_data)
384                 GOTO(out_desc, rc = -ENOMEM);
385
386         cb_data->callback = callback;
387         cb_data->cb_data = data;
388         data->desc = desc;
389         desc->bd_cb_data = cb_data;
390
391         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
392         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
393         ost_pack_ioo(&iooptr, md, page_count);
394         /* end almost identical to brw_write case */
395
396         spin_lock(&connection->c_lock);
397         xid = ++connection->c_xid_out;       /* single xid for all pages */
398         spin_unlock(&connection->c_lock);
399
400         for (mapped = 0; mapped < page_count; mapped++) {
401                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
402                 if (bulk == NULL)
403                         GOTO(out_unmap, rc = -ENOMEM);
404
405                 bulk->bp_xid = xid;           /* single xid for all pages */
406
407                 bulk->bp_buf = kmap(pga[mapped].pg);
408                 bulk->bp_page = pga[mapped].pg;
409                 bulk->bp_buflen = PAGE_SIZE;
410                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
411                                 pga[mapped].flag, bulk->bp_xid);
412         }
413
414         /*
415          * Register the bulk first, because the reply could arrive out of order,
416          * and we want to be ready for the bulk data.
417          *
418          * The reference is released when brw_finish is complete.
419          *
420          * On error, we never do the brw_finish, so we handle all decrefs.
421          */
422         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
423                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
424                        OBD_FAIL_OSC_BRW_READ_BULK);
425         } else {
426                 rc = ptlrpc_register_bulk(desc);
427                 if (rc)
428                         GOTO(out_unmap, rc);
429         }
430
431         request->rq_replen = lustre_msg_size(1, size);
432         rc = ptlrpc_queue_wait(request);
433         rc = ptlrpc_check_status(request, rc);
434
435         /*
436          * XXX: If there is an error during the processing of the callback,
437          *      such as a timeout in a sleep that it performs, brw_finish
438          *      will never get called, and we'll leak the desc, fail to kunmap
439          *      things, cats will live with dogs.  One solution would be to
440          *      export brw_finish as osc_brw_finish, so that the timeout case
441          *      and its kin could call it for proper cleanup.  An alternative
442          *      would be for an error return from the callback to cause us to
443          *      clean up, but that doesn't help the truly async cases (like
444          *      LOV), which will immediately return from their PHASE_START
445          *      callback, before any such cleanup-requiring error condition can
446          *      be detected.
447          */
448         if (rc)
449                 GOTO(out_req, rc);
450
451         /* Callbacks cause asynchronous handling. */
452         rc = callback(data, 0, CB_PHASE_START);
453
454 out_req:
455         ptlrpc_req_finished(request);
456         RETURN(rc);
457
458         /* Clean up on error. */
459 out_unmap:
460         while (mapped-- > 0)
461                 kunmap(pga[mapped].pg);
462         OBD_FREE(cb_data, sizeof(*cb_data));
463 out_desc:
464         ptlrpc_bulk_decref(desc);
465         goto out_req;
466 }
467
468 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
469                          obd_count page_count, struct brw_page *pga,
470                          brw_callback_t callback, struct io_cb_data *data)
471 {
472         struct ptlrpc_connection *connection =
473                 client_conn2cli(conn)->cl_import.imp_connection;
474         struct ptlrpc_request *request = NULL;
475         struct ptlrpc_bulk_desc *desc = NULL;
476         struct ost_body *body;
477         struct niobuf_local *local = NULL;
478         struct niobuf_remote *remote;
479         struct osc_brw_cb_data *cb_data = NULL;
480         int rc, j, size[3] = {sizeof(*body)};
481         void *iooptr, *nioptr;
482         int mapped = 0;
483         ENTRY;
484
485         size[1] = sizeof(struct obd_ioobj);
486         size[2] = page_count * sizeof(*remote);
487
488         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
489                                   NULL);
490         if (!request)
491                 RETURN(-ENOMEM);
492
493         body = lustre_msg_buf(request->rq_reqmsg, 0);
494
495         desc = ptlrpc_prep_bulk(connection);
496         if (!desc)
497                 GOTO(out_req, rc = -ENOMEM);
498         desc->bd_portal = OSC_BULK_PORTAL;
499         desc->bd_cb = brw_finish;
500         OBD_ALLOC(cb_data, sizeof(*cb_data));
501         if (!cb_data)
502                 GOTO(out_desc, rc = -ENOMEM);
503
504         cb_data->callback = callback;
505         cb_data->cb_data = data;
506         data->desc = desc;
507         desc->bd_cb_data = cb_data;
508
509         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
510         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
511         ost_pack_ioo(&iooptr, md, page_count);
512         /* end almost identical to brw_read case */
513
514         OBD_ALLOC(local, page_count * sizeof(*local));
515         if (!local)
516                 GOTO(out_cb, rc = -ENOMEM);
517
518         cb_data->obd_data = local;
519         cb_data->obd_size = page_count * sizeof(*local);
520
521         for (mapped = 0; mapped < page_count; mapped++) {
522                 local[mapped].addr = kmap(pga[mapped].pg);
523
524                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
525                        "%d ; page %d of %d\n",
526                        local[mapped].addr, pga[mapped].pg->flags,
527                        page_count(pga[mapped].pg), 
528                        mapped, page_count - 1);
529
530                 local[mapped].offset = pga[mapped].off;
531                 local[mapped].len = pga[mapped].count;
532                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
533                                 pga[mapped].flag, 0);
534         }
535
536         size[1] = page_count * sizeof(*remote);
537         request->rq_replen = lustre_msg_size(2, size);
538         rc = ptlrpc_queue_wait(request);
539         rc = ptlrpc_check_status(request, rc);
540         if (rc)
541                 GOTO(out_unmap, rc);
542
543         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
544         if (!nioptr)
545                 GOTO(out_unmap, rc = -EINVAL);
546
547         if (request->rq_repmsg->buflens[1] != size[1]) {
548                 CERROR("buffer length wrong (%d vs. %d)\n",
549                        request->rq_repmsg->buflens[1], size[1]);
550                 GOTO(out_unmap, rc = -EINVAL);
551         }
552
553         for (j = 0; j < page_count; j++) {
554                 struct ptlrpc_bulk_page *bulk;
555
556                 ost_unpack_niobuf(&nioptr, &remote);
557
558                 bulk = ptlrpc_prep_bulk_page(desc);
559                 if (!bulk)
560                         GOTO(out_unmap, rc = -ENOMEM);
561
562                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
563                 bulk->bp_buflen = local[j].len;
564                 bulk->bp_xid = remote->xid;
565                 bulk->bp_page = pga[j].pg;
566         }
567
568         if (desc->bd_page_count != page_count)
569                 LBUG();
570
571         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
572                 GOTO(out_unmap, rc = 0);
573
574         /* Our reference is released when brw_finish is complete. */
575         rc = ptlrpc_send_bulk(desc);
576
577         /* XXX: Mike, same question as in osc_brw_read. */
578         if (rc)
579                 GOTO(out_req, rc);
580
581         /* Callbacks cause asynchronous handling. */
582         rc = callback(data, 0, CB_PHASE_START);
583
584 out_req:
585         ptlrpc_req_finished(request);
586         RETURN(rc);
587
588         /* Clean up on error. */
589 out_unmap:
590         while (mapped-- > 0)
591                 kunmap(pga[mapped].pg);
592
593         OBD_FREE(local, page_count * sizeof(*local));
594 out_cb:
595         OBD_FREE(cb_data, sizeof(*cb_data));
596 out_desc:
597         ptlrpc_bulk_decref(desc);
598         goto out_req;
599 }
600
601 static int osc_brw(int cmd, struct lustre_handle *conn,
602                    struct lov_stripe_md *md, obd_count page_count,
603                    struct brw_page *pga, brw_callback_t callback,
604                    struct io_cb_data *data)
605 {
606         if (cmd & OBD_BRW_WRITE)
607                 return osc_brw_write(conn, md, page_count, pga, callback, data);
608         else
609                 return osc_brw_read(conn, md, page_count, pga, callback, data);
610 }
611
612 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
613                        struct lustre_handle *parent_lock,
614                        __u32 type, void *extentp, int extent_len, __u32 mode,
615                        int *flags, void *callback, void *data, int datalen,
616                        struct lustre_handle *lockh)
617 {
618         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
619         struct obd_device *obddev = class_conn2obd(connh);
620         struct ldlm_extent *extent = extentp;
621         int rc;
622         __u32 mode2;
623
624         /* Filesystem locks are given a bit of special treatment: first we
625          * fixup the lock to start and end on page boundaries. */
626         extent->start &= PAGE_MASK;
627         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
628
629         /* Next, search for already existing extent locks that will cover us */
630         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
631         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
632                              sizeof(extent), mode, lockh);
633         if (rc == 1) {
634                 /* We already have a lock, and it's referenced */
635                 return 0;
636         }
637
638         /* Next, search for locks that we can upgrade (if we're trying to write)
639          * or are more than we need (if we're trying to read).  Because the VFS
640          * and page cache already protect us locally, lots of readers/writers
641          * can share a single PW lock. */
642         if (mode == LCK_PW)
643                 mode2 = LCK_PR;
644         else
645                 mode2 = LCK_PW;
646
647         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
648                              sizeof(extent), mode2, lockh);
649         if (rc == 1) {
650                 int flags;
651                 /* FIXME: This is not incredibly elegant, but it might
652                  * be more elegant than adding another parameter to
653                  * lock_match.  I want a second opinion. */
654                 ldlm_lock_addref(lockh, mode);
655                 ldlm_lock_decref(lockh, mode2);
656
657                 if (mode == LCK_PR)
658                         return 0;
659
660                 rc = ldlm_cli_convert(lockh, mode, &flags);
661                 if (rc)
662                         LBUG();
663
664                 return rc;
665         }
666
667         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
668                               parent_lock, res_id, type, extent,
669                               sizeof(extent), mode, flags, ldlm_completion_ast,
670                               callback, data, datalen, lockh);
671         return rc;
672 }
673
674 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
675                       __u32 mode, struct lustre_handle *lockh)
676 {
677         ENTRY;
678
679         ldlm_lock_decref(lockh, mode);
680
681         RETURN(0);
682 }
683
684 static int osc_cancel_unused(struct lustre_handle *connh,
685                              struct lov_stripe_md *lsm, int local)
686 {
687         struct obd_device *obddev = class_conn2obd(connh);
688         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
689
690         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, local);
691 }
692
693 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
694 {
695         struct ptlrpc_request *request;
696         int rc, size = sizeof(*osfs);
697         ENTRY;
698
699         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
700                                   NULL);
701         if (!request)
702                 RETURN(-ENOMEM);
703
704         request->rq_replen = lustre_msg_size(1, &size);
705
706         rc = ptlrpc_queue_wait(request);
707         rc = ptlrpc_check_status(request, rc);
708         if (rc) {
709                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
710                 GOTO(out, rc);
711         }
712
713         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
714
715         EXIT;
716  out:
717         ptlrpc_free_req(request);
718         return rc;
719 }
720
721 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
722                          void *karg, void *uarg)
723 {
724         struct obd_device *obddev = class_conn2obd(conn);
725         struct obd_ioctl_data *data = karg;
726         int err = 0;
727         ENTRY;
728
729         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
730             _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
731                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
732                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
733                 RETURN(-EINVAL);
734         }
735
736         switch (cmd) {
737         case IOC_LDLM_TEST: {
738                 err = ldlm_test(obddev, conn);
739                 CERROR("-- done err %d\n", err);
740                 GOTO(out, err);
741         }
742         case IOC_LDLM_REGRESS_START: {
743                 unsigned int numthreads = 1;
744                 unsigned int numheld = 10;
745                 unsigned int numres = 10;
746                 unsigned int numext = 10;
747                 char *parse;
748
749                 if (data->ioc_inllen1) {
750                         parse = data->ioc_inlbuf1;
751                         if (*parse != '\0') {
752                                 while(isspace(*parse)) parse++;
753                                 numthreads = simple_strtoul(parse, &parse, 0);
754                                 while(isspace(*parse)) parse++;
755                         }
756                         if (*parse != '\0') {
757                                 while(isspace(*parse)) parse++;
758                                 numheld = simple_strtoul(parse, &parse, 0);
759                                 while(isspace(*parse)) parse++;
760                         }
761                         if (*parse != '\0') {
762                                 while(isspace(*parse)) parse++;
763                                 numres = simple_strtoul(parse, &parse, 0);
764                                 while(isspace(*parse)) parse++;
765                         }
766                         if (*parse != '\0') {
767                                 while(isspace(*parse)) parse++;
768                                 numext = simple_strtoul(parse, &parse, 0);
769                                 while(isspace(*parse)) parse++;
770                         }
771                 }
772
773                 err = ldlm_regression_start(obddev, conn, numthreads,
774                                 numheld, numres, numext);
775
776                 CERROR("-- done err %d\n", err);
777                 GOTO(out, err);
778         }
779         case IOC_LDLM_REGRESS_STOP: {
780                 err = ldlm_regression_stop();
781                 CERROR("-- done err %d\n", err);
782                 GOTO(out, err);
783         }
784         default:
785                 GOTO(out, err = -EINVAL);
786         }
787 out:
788         return err;
789 }
790
791 struct obd_ops osc_obd_ops = {
792         o_setup:        client_obd_setup,
793         o_cleanup:      client_obd_cleanup,
794         o_statfs:       osc_statfs,
795         o_create:       osc_create,
796         o_destroy:      osc_destroy,
797         o_getattr:      osc_getattr,
798         o_setattr:      osc_setattr,
799         o_open:         osc_open,
800         o_close:        osc_close,
801         o_connect:      client_obd_connect,
802         o_disconnect:   client_obd_disconnect,
803         o_brw:          osc_brw,
804         o_punch:        osc_punch,
805         o_enqueue:      osc_enqueue,
806         o_cancel:       osc_cancel,
807         o_cancel_unused: osc_cancel_unused,
808         o_iocontrol:    osc_iocontrol
809 };
810
811 static int __init osc_init(void)
812 {
813         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
814 }
815
816 static void __exit osc_exit(void)
817 {
818         class_unregister_type(LUSTRE_OSC_NAME);
819 }
820
821 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
822 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
823 MODULE_LICENSE("GPL");
824
825 module_init(osc_init);
826 module_exit(osc_exit);