Whamcloud - gitweb
- compile fixes for 2.5.44
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/mm.h>
24 #include <linux/highmem.h>
25 #include <linux/lustre_dlm.h>
26 #include <linux/workqueue.h>
27 #include <linux/lustre_mds.h> /* for mds_objid */
28 #include <linux/obd_ost.h>
29 #include <linux/obd_lov.h>
30 #include <linux/ctype.h>
31 #include <linux/init.h>
32 #include <linux/lustre_ha.h>
33 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
34 #include <linux/lustre_lite.h> /* for ll_i2info */
35
36 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
37                        struct lov_stripe_md *md)
38 {
39         struct ptlrpc_request *request;
40         struct ost_body *body;
41         int rc, size = sizeof(*body);
42         ENTRY;
43
44         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
45                                   &size, NULL);
46         if (!request)
47                 RETURN(-ENOMEM);
48
49         body = lustre_msg_buf(request->rq_reqmsg, 0);
50 #warning FIXME: pack only valid fields instead of memcpy, endianness
51         memcpy(&body->oa, oa, sizeof(*oa));
52
53         request->rq_replen = lustre_msg_size(1, &size);
54
55         rc = ptlrpc_queue_wait(request);
56         rc = ptlrpc_check_status(request, rc);
57         if (rc) {
58                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
59                 GOTO(out, rc);
60         }
61
62         body = lustre_msg_buf(request->rq_repmsg, 0);
63         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
64         if (oa)
65                 memcpy(oa, &body->oa, sizeof(*oa));
66
67         EXIT;
68  out:
69         ptlrpc_req_finished(request);
70         return rc;
71 }
72
73 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
74                     struct lov_stripe_md *md)
75 {
76         struct ptlrpc_request *request;
77         struct ost_body *body;
78         int rc, size = sizeof(*body);
79         ENTRY;
80
81         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
82                                   NULL);
83         if (!request)
84                 RETURN(-ENOMEM);
85
86         body = lustre_msg_buf(request->rq_reqmsg, 0);
87 #warning FIXME: pack only valid fields instead of memcpy, endianness
88         memcpy(&body->oa, oa, sizeof(*oa));
89
90         request->rq_replen = lustre_msg_size(1, &size);
91
92         rc = ptlrpc_queue_wait(request);
93         rc = ptlrpc_check_status(request, rc);
94         if (rc)
95                 GOTO(out, rc);
96
97         body = lustre_msg_buf(request->rq_repmsg, 0);
98         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
99         if (oa)
100                 memcpy(oa, &body->oa, sizeof(*oa));
101
102         EXIT;
103  out:
104         ptlrpc_req_finished(request);
105         return rc;
106 }
107
108 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
109                      struct lov_stripe_md *md)
110 {
111         struct ptlrpc_request *request;
112         struct ost_body *body;
113         int rc, size = sizeof(*body);
114         ENTRY;
115
116         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
117                                   NULL);
118         if (!request)
119                 RETURN(-ENOMEM);
120
121         body = lustre_msg_buf(request->rq_reqmsg, 0);
122 #warning FIXME: pack only valid fields instead of memcpy, endianness
123         memcpy(&body->oa, oa, sizeof(*oa));
124
125         request->rq_replen = lustre_msg_size(1, &size);
126
127         rc = ptlrpc_queue_wait(request);
128         rc = ptlrpc_check_status(request, rc);
129         if (rc)
130                 GOTO(out, rc);
131
132         body = lustre_msg_buf(request->rq_repmsg, 0);
133         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
134         if (oa)
135                 memcpy(oa, &body->oa, sizeof(*oa));
136
137         EXIT;
138  out:
139         ptlrpc_req_finished(request);
140         return rc;
141 }
142
143 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
144                        struct lov_stripe_md *md)
145 {
146         struct ptlrpc_request *request;
147         struct ost_body *body;
148         int rc, size = sizeof(*body);
149         ENTRY;
150
151         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
152                                   &size, NULL);
153         if (!request)
154                 RETURN(-ENOMEM);
155
156         body = lustre_msg_buf(request->rq_reqmsg, 0);
157         memcpy(&body->oa, oa, sizeof(*oa));
158
159         request->rq_replen = lustre_msg_size(1, &size);
160
161         rc = ptlrpc_queue_wait(request);
162         rc = ptlrpc_check_status(request, rc);
163
164         ptlrpc_req_finished(request);
165         return rc;
166 }
167
168 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
169                       struct lov_stripe_md **ea)
170 {
171         struct ptlrpc_request *request;
172         struct ost_body *body;
173         struct lov_stripe_md *lsm;
174         int rc, size = sizeof(*body);
175         ENTRY;
176
177         LASSERT(oa);
178         LASSERT(ea);
179
180         lsm = *ea;
181         if (!lsm) {
182                 // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
183                 OBD_ALLOC(lsm, oa->o_easize);
184                 if (!lsm)
185                         RETURN(-ENOMEM);
186                 lsm->lsm_mds_easize = oa->o_easize;
187         }
188
189         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
190                                   NULL);
191         if (!request)
192                 GOTO(out, rc = -ENOMEM);
193
194         body = lustre_msg_buf(request->rq_reqmsg, 0);
195         memcpy(&body->oa, oa, sizeof(*oa));
196
197         request->rq_replen = lustre_msg_size(1, &size);
198
199         rc = ptlrpc_queue_wait(request);
200         rc = ptlrpc_check_status(request, rc);
201         if (rc)
202                 GOTO(out_req, rc);
203
204         body = lustre_msg_buf(request->rq_repmsg, 0);
205         memcpy(oa, &body->oa, sizeof(*oa));
206
207         lsm->lsm_object_id = oa->o_id;
208         lsm->lsm_stripe_count = 0;
209         *ea = lsm;
210         EXIT;
211 out_req:
212         ptlrpc_req_finished(request);
213 out:
214         if (rc && !*ea)
215                 OBD_FREE(lsm, oa->o_easize);
216         return rc;
217 }
218
219 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
220                      struct lov_stripe_md *md, obd_size start,
221                      obd_size end)
222 {
223         struct ptlrpc_request *request;
224         struct ost_body *body;
225         int rc, size = sizeof(*body);
226         ENTRY;
227
228         if (!oa) {
229                 CERROR("oa NULL\n");
230                 RETURN(-EINVAL);
231         }
232
233         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
234                                   NULL);
235         if (!request)
236                 RETURN(-ENOMEM);
237
238         body = lustre_msg_buf(request->rq_reqmsg, 0);
239 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
240         memcpy(&body->oa, oa, sizeof(*oa));
241
242         /* overload the blocks and size fields in the oa with start/end */
243 #warning FIXME: endianness, size=start, blocks=end?
244         body->oa.o_blocks = start;
245         body->oa.o_size = end;
246         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
247
248         request->rq_replen = lustre_msg_size(1, &size);
249
250         rc = ptlrpc_queue_wait(request);
251         rc = ptlrpc_check_status(request, rc);
252         if (rc)
253                 GOTO(out, rc);
254
255         body = lustre_msg_buf(request->rq_repmsg, 0);
256         memcpy(oa, &body->oa, sizeof(*oa));
257
258         EXIT;
259  out:
260         ptlrpc_req_finished(request);
261         return rc;
262 }
263
264 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
265                        struct lov_stripe_md *ea)
266 {
267         struct ptlrpc_request *request;
268         struct ost_body *body;
269         int rc, size = sizeof(*body);
270         ENTRY;
271
272         if (!oa) {
273                 CERROR("oa NULL\n");
274                 RETURN(-EINVAL);
275         }
276         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
277                                   &size, NULL);
278         if (!request)
279                 RETURN(-ENOMEM);
280
281         body = lustre_msg_buf(request->rq_reqmsg, 0);
282 #warning FIXME: pack only valid fields instead of memcpy, endianness
283         memcpy(&body->oa, oa, sizeof(*oa));
284
285         request->rq_replen = lustre_msg_size(1, &size);
286
287         rc = ptlrpc_queue_wait(request);
288         rc = ptlrpc_check_status(request, rc);
289         if (rc)
290                 GOTO(out, rc);
291
292         body = lustre_msg_buf(request->rq_repmsg, 0);
293         memcpy(oa, &body->oa, sizeof(*oa));
294
295         EXIT;
296  out:
297         ptlrpc_req_finished(request);
298         return rc;
299 }
300
301 struct osc_brw_cb_data {
302         brw_callback_t callback;
303         void *cb_data;
304         void *obd_data;
305         size_t obd_size;
306 };
307
308 /* Our bulk-unmapping bottom half. */
309 static void unmap_and_decref_bulk_desc(void *data)
310 {
311         struct ptlrpc_bulk_desc *desc = data;
312         struct list_head *tmp;
313         ENTRY;
314
315         /* This feels wrong to me. */
316         list_for_each(tmp, &desc->bd_page_list) {
317                 struct ptlrpc_bulk_page *bulk;
318                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
319
320                 kunmap(bulk->bp_page);
321         }
322
323         ptlrpc_bulk_decref(desc);
324         EXIT;
325 }
326
327 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
328 {
329         struct osc_brw_cb_data *cb_data = data;
330         int err = 0;
331         ENTRY;
332
333         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
334                 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
335                        -ETIMEDOUT);
336         }
337
338         if (cb_data->callback)
339                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
340
341         if (cb_data->obd_data)
342                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
343         OBD_FREE(cb_data, sizeof(*cb_data));
344
345         /* We can't kunmap the desc from interrupt context, so we do it from
346          * the bottom half above. */
347         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
348         schedule_work(&desc->bd_queue);
349
350         EXIT;
351 }
352
353 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
354                         obd_count page_count, struct brw_page *pga,
355                         brw_callback_t callback, struct io_cb_data *data)
356 {
357         struct ptlrpc_connection *connection =
358                 client_conn2cli(conn)->cl_import.imp_connection;
359         struct ptlrpc_request *request = NULL;
360         struct ptlrpc_bulk_desc *desc = NULL;
361         struct ost_body *body;
362         struct osc_brw_cb_data *cb_data = NULL;
363         int rc, size[3] = {sizeof(*body)};
364         void *iooptr, *nioptr;
365         int mapped = 0;
366         __u32 xid;
367         ENTRY;
368
369         size[1] = sizeof(struct obd_ioobj);
370         size[2] = page_count * sizeof(struct niobuf_remote);
371
372         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
373                                   NULL);
374         if (!request)
375                 RETURN(-ENOMEM);
376
377         body = lustre_msg_buf(request->rq_reqmsg, 0);
378
379         desc = ptlrpc_prep_bulk(connection);
380         if (!desc)
381                 GOTO(out_req, rc = -ENOMEM);
382         desc->bd_portal = OST_BULK_PORTAL;
383         desc->bd_cb = brw_finish;
384         OBD_ALLOC(cb_data, sizeof(*cb_data));
385         if (!cb_data)
386                 GOTO(out_desc, rc = -ENOMEM);
387
388         cb_data->callback = callback;
389         cb_data->cb_data = data;
390         data->desc = desc;
391         desc->bd_cb_data = cb_data;
392
393         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
394         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
395         ost_pack_ioo(&iooptr, lsm, page_count);
396         /* end almost identical to brw_write case */
397
398         spin_lock(&connection->c_lock);
399         xid = ++connection->c_xid_out;       /* single xid for all pages */
400         spin_unlock(&connection->c_lock);
401
402         for (mapped = 0; mapped < page_count; mapped++) {
403                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
404                 if (bulk == NULL)
405                         GOTO(out_unmap, rc = -ENOMEM);
406
407                 bulk->bp_xid = xid;           /* single xid for all pages */
408
409                 bulk->bp_buf = kmap(pga[mapped].pg);
410                 bulk->bp_page = pga[mapped].pg;
411                 bulk->bp_buflen = PAGE_SIZE;
412                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
413                                 pga[mapped].flag, bulk->bp_xid);
414         }
415
416         /*
417          * Register the bulk first, because the reply could arrive out of order,
418          * and we want to be ready for the bulk data.
419          *
420          * The reference is released when brw_finish is complete.
421          *
422          * On error, we never do the brw_finish, so we handle all decrefs.
423          */
424         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
425                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
426                        OBD_FAIL_OSC_BRW_READ_BULK);
427         } else {
428                 rc = ptlrpc_register_bulk(desc);
429                 if (rc)
430                         GOTO(out_unmap, rc);
431         }
432
433         request->rq_replen = lustre_msg_size(1, size);
434         rc = ptlrpc_queue_wait(request);
435         rc = ptlrpc_check_status(request, rc);
436
437         /*
438          * XXX: If there is an error during the processing of the callback,
439          *      such as a timeout in a sleep that it performs, brw_finish
440          *      will never get called, and we'll leak the desc, fail to kunmap
441          *      things, cats will live with dogs.  One solution would be to
442          *      export brw_finish as osc_brw_finish, so that the timeout case
443          *      and its kin could call it for proper cleanup.  An alternative
444          *      would be for an error return from the callback to cause us to
445          *      clean up, but that doesn't help the truly async cases (like
446          *      LOV), which will immediately return from their PHASE_START
447          *      callback, before any such cleanup-requiring error condition can
448          *      be detected.
449          */
450         if (rc)
451                 GOTO(out_req, rc);
452
453         /* Callbacks cause asynchronous handling. */
454         rc = callback(data, 0, CB_PHASE_START);
455
456 out_req:
457         ptlrpc_req_finished(request);
458         RETURN(rc);
459
460         /* Clean up on error. */
461 out_unmap:
462         while (mapped-- > 0)
463                 kunmap(pga[mapped].pg);
464         OBD_FREE(cb_data, sizeof(*cb_data));
465 out_desc:
466         ptlrpc_bulk_decref(desc);
467         goto out_req;
468 }
469
470 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
471                          obd_count page_count, struct brw_page *pga,
472                          brw_callback_t callback, struct io_cb_data *data)
473 {
474         struct ptlrpc_connection *connection =
475                 client_conn2cli(conn)->cl_import.imp_connection;
476         struct ptlrpc_request *request = NULL;
477         struct ptlrpc_bulk_desc *desc = NULL;
478         struct ost_body *body;
479         struct niobuf_local *local = NULL;
480         struct niobuf_remote *remote;
481         struct osc_brw_cb_data *cb_data = NULL;
482         int rc, j, size[3] = {sizeof(*body)};
483         void *iooptr, *nioptr;
484         int mapped = 0;
485         ENTRY;
486
487         size[1] = sizeof(struct obd_ioobj);
488         size[2] = page_count * sizeof(*remote);
489
490         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
491                                   NULL);
492         if (!request)
493                 RETURN(-ENOMEM);
494
495         body = lustre_msg_buf(request->rq_reqmsg, 0);
496
497         desc = ptlrpc_prep_bulk(connection);
498         if (!desc)
499                 GOTO(out_req, rc = -ENOMEM);
500         desc->bd_portal = OSC_BULK_PORTAL;
501         desc->bd_cb = brw_finish;
502         OBD_ALLOC(cb_data, sizeof(*cb_data));
503         if (!cb_data)
504                 GOTO(out_desc, rc = -ENOMEM);
505
506         cb_data->callback = callback;
507         cb_data->cb_data = data;
508         data->desc = desc;
509         desc->bd_cb_data = cb_data;
510
511         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
512         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
513         ost_pack_ioo(&iooptr, md, page_count);
514         /* end almost identical to brw_read case */
515
516         OBD_ALLOC(local, page_count * sizeof(*local));
517         if (!local)
518                 GOTO(out_cb, rc = -ENOMEM);
519
520         cb_data->obd_data = local;
521         cb_data->obd_size = page_count * sizeof(*local);
522
523         for (mapped = 0; mapped < page_count; mapped++) {
524                 local[mapped].addr = kmap(pga[mapped].pg);
525
526                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
527                        "%d ; page %d of %d\n",
528                        local[mapped].addr, pga[mapped].pg->flags,
529                        page_count(pga[mapped].pg),
530                        mapped, page_count - 1);
531
532                 local[mapped].offset = pga[mapped].off;
533                 local[mapped].len = pga[mapped].count;
534                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
535                                 pga[mapped].flag, 0);
536         }
537
538         size[1] = page_count * sizeof(*remote);
539         request->rq_replen = lustre_msg_size(2, size);
540         rc = ptlrpc_queue_wait(request);
541         rc = ptlrpc_check_status(request, rc);
542         if (rc)
543                 GOTO(out_unmap, rc);
544
545         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
546         if (!nioptr)
547                 GOTO(out_unmap, rc = -EINVAL);
548
549         if (request->rq_repmsg->buflens[1] != size[1]) {
550                 CERROR("buffer length wrong (%d vs. %d)\n",
551                        request->rq_repmsg->buflens[1], size[1]);
552                 GOTO(out_unmap, rc = -EINVAL);
553         }
554
555         for (j = 0; j < page_count; j++) {
556                 struct ptlrpc_bulk_page *bulk;
557
558                 ost_unpack_niobuf(&nioptr, &remote);
559
560                 bulk = ptlrpc_prep_bulk_page(desc);
561                 if (!bulk)
562                         GOTO(out_unmap, rc = -ENOMEM);
563
564                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
565                 bulk->bp_buflen = local[j].len;
566                 bulk->bp_xid = remote->xid;
567                 bulk->bp_page = pga[j].pg;
568         }
569
570         if (desc->bd_page_count != page_count)
571                 LBUG();
572
573         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
574                 GOTO(out_unmap, rc = 0);
575
576         /* Our reference is released when brw_finish is complete. */
577         rc = ptlrpc_send_bulk(desc);
578
579         /* XXX: Mike, same question as in osc_brw_read. */
580         if (rc)
581                 GOTO(out_req, rc);
582
583         /* Callbacks cause asynchronous handling. */
584         rc = callback(data, 0, CB_PHASE_START);
585
586 out_req:
587         ptlrpc_req_finished(request);
588         RETURN(rc);
589
590         /* Clean up on error. */
591 out_unmap:
592         while (mapped-- > 0)
593                 kunmap(pga[mapped].pg);
594
595         OBD_FREE(local, page_count * sizeof(*local));
596 out_cb:
597         OBD_FREE(cb_data, sizeof(*cb_data));
598 out_desc:
599         ptlrpc_bulk_decref(desc);
600         goto out_req;
601 }
602
603 static int osc_brw(int cmd, struct lustre_handle *conn,
604                    struct lov_stripe_md *md, obd_count page_count,
605                    struct brw_page *pga, brw_callback_t callback,
606                    struct io_cb_data *data)
607 {
608         if (cmd & OBD_BRW_WRITE)
609                 return osc_brw_write(conn, md, page_count, pga, callback, data);
610         else
611                 return osc_brw_read(conn, md, page_count, pga, callback, data);
612 }
613
614 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
615                        struct lustre_handle *parent_lock,
616                        __u32 type, void *extentp, int extent_len, __u32 mode,
617                        int *flags, void *callback, void *data, int datalen,
618                        struct lustre_handle *lockh)
619 {
620         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
621         struct obd_device *obddev = class_conn2obd(connh);
622         struct ldlm_extent *extent = extentp;
623         int rc;
624         ENTRY;
625
626         /* Filesystem locks are given a bit of special treatment: if
627          * this is not a file size lock (which has end == -1), we
628          * fixup the lock to start and end on page boundaries. */
629         if (extent->end != OBD_OBJECT_EOF) {
630                 extent->start &= PAGE_MASK;
631                 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
632         }
633
634         /* Next, search for already existing extent locks that will cover us */
635         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
636                              sizeof(extent), mode, lockh);
637         if (rc == 1)
638                 RETURN(0); /* We already have a lock, and it's referenced */
639
640         /* If we're trying to read, we also search for an existing PW lock.  The
641          * VFS and page cache already protect us locally, so lots of readers/
642          * writers can share a single PW lock.
643          *
644          * There are problems with conversion deadlocks, so instead of
645          * converting a read lock to a write lock, we'll just enqueue a new
646          * one.
647          *
648          * At some point we should cancel the read lock instead of making them
649          * send us a blocking callback, but there are problems with canceling
650          * locks out from other users right now, too. */
651
652         if (mode == LCK_PR) {
653                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
654                                      extent, sizeof(extent), LCK_PW, lockh);
655                 if (rc == 1) {
656                         /* FIXME: This is not incredibly elegant, but it might
657                          * be more elegant than adding another parameter to
658                          * lock_match.  I want a second opinion. */
659                         ldlm_lock_addref(lockh, LCK_PR);
660                         ldlm_lock_decref(lockh, LCK_PW);
661
662                         RETURN(0);
663                 }
664         }
665
666         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
667                               res_id, type, extent, sizeof(extent), mode, flags,
668                               ldlm_completion_ast, callback, data, datalen,
669                               lockh);
670         RETURN(rc);
671 }
672
673 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
674                       __u32 mode, struct lustre_handle *lockh)
675 {
676         ENTRY;
677
678         ldlm_lock_decref(lockh, mode);
679
680         RETURN(0);
681 }
682
683 static int osc_cancel_unused(struct lustre_handle *connh,
684                              struct lov_stripe_md *lsm, int local)
685 {
686         struct obd_device *obddev = class_conn2obd(connh);
687         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
688
689         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, local);
690 }
691
692 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
693 {
694         struct ptlrpc_request *request;
695         int rc, size = sizeof(*osfs);
696         ENTRY;
697
698         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
699                                   NULL);
700         if (!request)
701                 RETURN(-ENOMEM);
702
703         request->rq_replen = lustre_msg_size(1, &size);
704
705         rc = ptlrpc_queue_wait(request);
706         rc = ptlrpc_check_status(request, rc);
707         if (rc) {
708                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
709                 GOTO(out, rc);
710         }
711
712         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
713
714         EXIT;
715  out:
716         ptlrpc_req_finished(request);
717         return rc;
718 }
719
720 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
721                          void *karg, void *uarg)
722 {
723         struct obd_device *obddev = class_conn2obd(conn);
724         struct obd_ioctl_data *data = karg;
725         int err = 0;
726         ENTRY;
727
728         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
729             _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
730                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
731                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
732                 RETURN(-EINVAL);
733         }
734
735         switch (cmd) {
736         case IOC_LDLM_TEST: {
737                 err = ldlm_test(obddev, conn);
738                 CERROR("-- done err %d\n", err);
739                 GOTO(out, err);
740         }
741         case IOC_LDLM_REGRESS_START: {
742                 unsigned int numthreads = 1;
743                 unsigned int numheld = 10;
744                 unsigned int numres = 10;
745                 unsigned int numext = 10;
746                 char *parse;
747
748                 if (data->ioc_inllen1) {
749                         parse = data->ioc_inlbuf1;
750                         if (*parse != '\0') {
751                                 while(isspace(*parse)) parse++;
752                                 numthreads = simple_strtoul(parse, &parse, 0);
753                                 while(isspace(*parse)) parse++;
754                         }
755                         if (*parse != '\0') {
756                                 while(isspace(*parse)) parse++;
757                                 numheld = simple_strtoul(parse, &parse, 0);
758                                 while(isspace(*parse)) parse++;
759                         }
760                         if (*parse != '\0') {
761                                 while(isspace(*parse)) parse++;
762                                 numres = simple_strtoul(parse, &parse, 0);
763                                 while(isspace(*parse)) parse++;
764                         }
765                         if (*parse != '\0') {
766                                 while(isspace(*parse)) parse++;
767                                 numext = simple_strtoul(parse, &parse, 0);
768                                 while(isspace(*parse)) parse++;
769                         }
770                 }
771
772                 err = ldlm_regression_start(obddev, conn, numthreads,
773                                 numheld, numres, numext);
774
775                 CERROR("-- done err %d\n", err);
776                 GOTO(out, err);
777         }
778         case IOC_LDLM_REGRESS_STOP: {
779                 err = ldlm_regression_stop();
780                 CERROR("-- done err %d\n", err);
781                 GOTO(out, err);
782         }
783         default:
784                 GOTO(out, err = -EINVAL);
785         }
786 out:
787         return err;
788 }
789
790 struct obd_ops osc_obd_ops = {
791         o_setup:        client_obd_setup,
792         o_cleanup:      client_obd_cleanup,
793         o_statfs:       osc_statfs,
794         o_create:       osc_create,
795         o_destroy:      osc_destroy,
796         o_getattr:      osc_getattr,
797         o_setattr:      osc_setattr,
798         o_open:         osc_open,
799         o_close:        osc_close,
800         o_connect:      client_obd_connect,
801         o_disconnect:   client_obd_disconnect,
802         o_brw:          osc_brw,
803         o_punch:        osc_punch,
804         o_enqueue:      osc_enqueue,
805         o_cancel:       osc_cancel,
806         o_cancel_unused: osc_cancel_unused,
807         o_iocontrol:    osc_iocontrol
808 };
809
810 static int __init osc_init(void)
811 {
812         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
813 }
814
815 static void __exit osc_exit(void)
816 {
817         class_unregister_type(LUSTRE_OSC_NAME);
818 }
819
820 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
821 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
822 MODULE_LICENSE("GPL");
823
824 module_init(osc_init);
825 module_exit(osc_exit);