Whamcloud - gitweb
47b7c5bfa9a6fcd62d624605624a2cde5658639d
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/mm.h>
24 #include <linux/highmem.h>
25 #include <linux/lustre_dlm.h>
26 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
27 #include <linux/workqueue.h>
28 #endif
29 #include <linux/kp30.h>
30 #include <linux/lustre_mds.h> /* for mds_objid */
31 #include <linux/obd_ost.h>
32 #include <linux/obd_lov.h>
33 #include <linux/ctype.h>
34 #include <linux/init.h>
35 #include <linux/lustre_ha.h>
36 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
37 #include <linux/lustre_lite.h> /* for ll_i2info */
38
39 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
40                        struct lov_stripe_md *md)
41 {
42         struct ptlrpc_request *request;
43         struct ost_body *body;
44         int rc, size = sizeof(*body);
45         ENTRY;
46
47         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
48                                   &size, NULL);
49         if (!request)
50                 RETURN(-ENOMEM);
51
52         body = lustre_msg_buf(request->rq_reqmsg, 0);
53 #warning FIXME: pack only valid fields instead of memcpy, endianness
54         memcpy(&body->oa, oa, sizeof(*oa));
55
56         request->rq_replen = lustre_msg_size(1, &size);
57
58         rc = ptlrpc_queue_wait(request);
59         rc = ptlrpc_check_status(request, rc);
60         if (rc) {
61                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
62                 GOTO(out, rc);
63         }
64
65         body = lustre_msg_buf(request->rq_repmsg, 0);
66         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
67         if (oa)
68                 memcpy(oa, &body->oa, sizeof(*oa));
69
70         EXIT;
71  out:
72         ptlrpc_req_finished(request);
73         return rc;
74 }
75
76 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
77                     struct lov_stripe_md *md)
78 {
79         struct ptlrpc_request *request;
80         struct ost_body *body;
81         int rc, size = sizeof(*body);
82         ENTRY;
83
84         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
85                                   NULL);
86         if (!request)
87                 RETURN(-ENOMEM);
88
89         body = lustre_msg_buf(request->rq_reqmsg, 0);
90 #warning FIXME: pack only valid fields instead of memcpy, endianness
91         memcpy(&body->oa, oa, sizeof(*oa));
92
93         request->rq_replen = lustre_msg_size(1, &size);
94
95         rc = ptlrpc_queue_wait(request);
96         rc = ptlrpc_check_status(request, rc);
97         if (rc)
98                 GOTO(out, rc);
99
100         body = lustre_msg_buf(request->rq_repmsg, 0);
101         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
102         if (oa)
103                 memcpy(oa, &body->oa, sizeof(*oa));
104
105         EXIT;
106  out:
107         ptlrpc_req_finished(request);
108         return rc;
109 }
110
111 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
112                      struct lov_stripe_md *md)
113 {
114         struct ptlrpc_request *request;
115         struct ost_body *body;
116         int rc, size = sizeof(*body);
117         ENTRY;
118
119         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
120                                   NULL);
121         if (!request)
122                 RETURN(-ENOMEM);
123
124         body = lustre_msg_buf(request->rq_reqmsg, 0);
125 #warning FIXME: pack only valid fields instead of memcpy, endianness
126         memcpy(&body->oa, oa, sizeof(*oa));
127
128         request->rq_replen = lustre_msg_size(1, &size);
129
130         rc = ptlrpc_queue_wait(request);
131         rc = ptlrpc_check_status(request, rc);
132         if (rc)
133                 GOTO(out, rc);
134
135         body = lustre_msg_buf(request->rq_repmsg, 0);
136         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
137         if (oa)
138                 memcpy(oa, &body->oa, sizeof(*oa));
139
140         EXIT;
141  out:
142         ptlrpc_req_finished(request);
143         return rc;
144 }
145
146 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
147                        struct lov_stripe_md *md)
148 {
149         struct ptlrpc_request *request;
150         struct ost_body *body;
151         int rc, size = sizeof(*body);
152         ENTRY;
153
154         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
155                                   &size, NULL);
156         if (!request)
157                 RETURN(-ENOMEM);
158
159         body = lustre_msg_buf(request->rq_reqmsg, 0);
160         memcpy(&body->oa, oa, sizeof(*oa));
161
162         request->rq_replen = lustre_msg_size(1, &size);
163
164         rc = ptlrpc_queue_wait(request);
165         rc = ptlrpc_check_status(request, rc);
166
167         ptlrpc_req_finished(request);
168         return rc;
169 }
170
171 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
172                       struct lov_stripe_md **ea)
173 {
174         struct ptlrpc_request *request;
175         struct ost_body *body;
176         struct lov_stripe_md *lsm;
177         int rc, size = sizeof(*body);
178         ENTRY;
179
180         LASSERT(oa);
181         LASSERT(ea);
182
183         lsm = *ea;
184         if (!lsm) {
185                 // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
186                 OBD_ALLOC(lsm, oa->o_easize);
187                 if (!lsm)
188                         RETURN(-ENOMEM);
189                 lsm->lsm_mds_easize = oa->o_easize;
190         }
191
192         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
193                                   NULL);
194         if (!request)
195                 GOTO(out, rc = -ENOMEM);
196
197         body = lustre_msg_buf(request->rq_reqmsg, 0);
198         memcpy(&body->oa, oa, sizeof(*oa));
199
200         request->rq_replen = lustre_msg_size(1, &size);
201
202         rc = ptlrpc_queue_wait(request);
203         rc = ptlrpc_check_status(request, rc);
204         if (rc)
205                 GOTO(out_req, rc);
206
207         body = lustre_msg_buf(request->rq_repmsg, 0);
208         memcpy(oa, &body->oa, sizeof(*oa));
209
210         lsm->lsm_object_id = oa->o_id;
211         lsm->lsm_stripe_count = 0;
212         *ea = lsm;
213         EXIT;
214 out_req:
215         ptlrpc_req_finished(request);
216 out:
217         if (rc && !*ea)
218                 OBD_FREE(lsm, oa->o_easize);
219         return rc;
220 }
221
222 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
223                      struct lov_stripe_md *md, obd_size start,
224                      obd_size end)
225 {
226         struct ptlrpc_request *request;
227         struct ost_body *body;
228         int rc, size = sizeof(*body);
229         ENTRY;
230
231         if (!oa) {
232                 CERROR("oa NULL\n");
233                 RETURN(-EINVAL);
234         }
235
236         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
237                                   NULL);
238         if (!request)
239                 RETURN(-ENOMEM);
240
241         body = lustre_msg_buf(request->rq_reqmsg, 0);
242 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
243         memcpy(&body->oa, oa, sizeof(*oa));
244
245         /* overload the blocks and size fields in the oa with start/end */
246 #warning FIXME: endianness, size=start, blocks=end?
247         body->oa.o_blocks = start;
248         body->oa.o_size = end;
249         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
250
251         request->rq_replen = lustre_msg_size(1, &size);
252
253         rc = ptlrpc_queue_wait(request);
254         rc = ptlrpc_check_status(request, rc);
255         if (rc)
256                 GOTO(out, rc);
257
258         body = lustre_msg_buf(request->rq_repmsg, 0);
259         memcpy(oa, &body->oa, sizeof(*oa));
260
261         EXIT;
262  out:
263         ptlrpc_req_finished(request);
264         return rc;
265 }
266
267 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
268                        struct lov_stripe_md *ea)
269 {
270         struct ptlrpc_request *request;
271         struct ost_body *body;
272         int rc, size = sizeof(*body);
273         ENTRY;
274
275         if (!oa) {
276                 CERROR("oa NULL\n");
277                 RETURN(-EINVAL);
278         }
279         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
280                                   &size, NULL);
281         if (!request)
282                 RETURN(-ENOMEM);
283
284         body = lustre_msg_buf(request->rq_reqmsg, 0);
285 #warning FIXME: pack only valid fields instead of memcpy, endianness
286         memcpy(&body->oa, oa, sizeof(*oa));
287
288         request->rq_replen = lustre_msg_size(1, &size);
289
290         rc = ptlrpc_queue_wait(request);
291         rc = ptlrpc_check_status(request, rc);
292         if (rc)
293                 GOTO(out, rc);
294
295         body = lustre_msg_buf(request->rq_repmsg, 0);
296         memcpy(oa, &body->oa, sizeof(*oa));
297
298         EXIT;
299  out:
300         ptlrpc_req_finished(request);
301         return rc;
302 }
303
304 struct osc_brw_cb_data {
305         brw_callback_t callback;
306         void *cb_data;
307         void *obd_data;
308         size_t obd_size;
309 };
310
311 /* Our bulk-unmapping bottom half. */
312 static void unmap_and_decref_bulk_desc(void *data)
313 {
314         struct ptlrpc_bulk_desc *desc = data;
315         struct list_head *tmp;
316         ENTRY;
317
318         /* This feels wrong to me. */
319         list_for_each(tmp, &desc->bd_page_list) {
320                 struct ptlrpc_bulk_page *bulk;
321                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
322
323                 kunmap(bulk->bp_page);
324         }
325
326         ptlrpc_bulk_decref(desc);
327         EXIT;
328 }
329
330 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
331 {
332         struct osc_brw_cb_data *cb_data = data;
333         int err = 0;
334         ENTRY;
335
336         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
337                 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
338                        -ETIMEDOUT);
339         }
340
341         if (cb_data->callback)
342                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
343
344         if (cb_data->obd_data)
345                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
346         OBD_FREE(cb_data, sizeof(*cb_data));
347
348         /* We can't kunmap the desc from interrupt context, so we do it from
349          * the bottom half above. */
350         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
351         schedule_work(&desc->bd_queue);
352
353         EXIT;
354 }
355
356 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
357                         obd_count page_count, struct brw_page *pga,
358                         brw_callback_t callback, struct io_cb_data *data)
359 {
360         struct ptlrpc_connection *connection =
361                 client_conn2cli(conn)->cl_import.imp_connection;
362         struct ptlrpc_request *request = NULL;
363         struct ptlrpc_bulk_desc *desc = NULL;
364         struct ost_body *body;
365         struct osc_brw_cb_data *cb_data = NULL;
366         int rc, size[3] = {sizeof(*body)};
367         void *iooptr, *nioptr;
368         int mapped = 0;
369         __u32 xid;
370         ENTRY;
371
372         size[1] = sizeof(struct obd_ioobj);
373         size[2] = page_count * sizeof(struct niobuf_remote);
374
375         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
376                                   NULL);
377         if (!request)
378                 RETURN(-ENOMEM);
379
380         body = lustre_msg_buf(request->rq_reqmsg, 0);
381
382         desc = ptlrpc_prep_bulk(connection);
383         if (!desc)
384                 GOTO(out_req, rc = -ENOMEM);
385         desc->bd_portal = OST_BULK_PORTAL;
386         desc->bd_cb = brw_finish;
387         OBD_ALLOC(cb_data, sizeof(*cb_data));
388         if (!cb_data)
389                 GOTO(out_desc, rc = -ENOMEM);
390
391         cb_data->callback = callback;
392         cb_data->cb_data = data;
393         data->desc = desc;
394         desc->bd_cb_data = cb_data;
395
396         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
397         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
398         ost_pack_ioo(&iooptr, lsm, page_count);
399         /* end almost identical to brw_write case */
400
401         spin_lock(&connection->c_lock);
402         xid = ++connection->c_xid_out;       /* single xid for all pages */
403         spin_unlock(&connection->c_lock);
404
405         for (mapped = 0; mapped < page_count; mapped++) {
406                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
407                 if (bulk == NULL)
408                         GOTO(out_unmap, rc = -ENOMEM);
409
410                 bulk->bp_xid = xid;           /* single xid for all pages */
411
412                 bulk->bp_buf = kmap(pga[mapped].pg);
413                 bulk->bp_page = pga[mapped].pg;
414                 bulk->bp_buflen = PAGE_SIZE;
415                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
416                                 pga[mapped].flag, bulk->bp_xid);
417         }
418
419         /*
420          * Register the bulk first, because the reply could arrive out of order,
421          * and we want to be ready for the bulk data.
422          *
423          * The reference is released when brw_finish is complete.
424          *
425          * On error, we never do the brw_finish, so we handle all decrefs.
426          */
427         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
428                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
429                        OBD_FAIL_OSC_BRW_READ_BULK);
430         } else {
431                 rc = ptlrpc_register_bulk(desc);
432                 if (rc)
433                         GOTO(out_unmap, rc);
434         }
435
436         request->rq_replen = lustre_msg_size(1, size);
437         rc = ptlrpc_queue_wait(request);
438         rc = ptlrpc_check_status(request, rc);
439
440         /*
441          * XXX: If there is an error during the processing of the callback,
442          *      such as a timeout in a sleep that it performs, brw_finish
443          *      will never get called, and we'll leak the desc, fail to kunmap
444          *      things, cats will live with dogs.  One solution would be to
445          *      export brw_finish as osc_brw_finish, so that the timeout case
446          *      and its kin could call it for proper cleanup.  An alternative
447          *      would be for an error return from the callback to cause us to
448          *      clean up, but that doesn't help the truly async cases (like
449          *      LOV), which will immediately return from their PHASE_START
450          *      callback, before any such cleanup-requiring error condition can
451          *      be detected.
452          */
453         if (rc)
454                 GOTO(out_req, rc);
455
456         /* Callbacks cause asynchronous handling. */
457         rc = callback(data, 0, CB_PHASE_START);
458
459 out_req:
460         ptlrpc_req_finished(request);
461         RETURN(rc);
462
463         /* Clean up on error. */
464 out_unmap:
465         while (mapped-- > 0)
466                 kunmap(pga[mapped].pg);
467         OBD_FREE(cb_data, sizeof(*cb_data));
468 out_desc:
469         ptlrpc_bulk_decref(desc);
470         goto out_req;
471 }
472
473 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
474                          obd_count page_count, struct brw_page *pga,
475                          brw_callback_t callback, struct io_cb_data *data)
476 {
477         struct ptlrpc_connection *connection =
478                 client_conn2cli(conn)->cl_import.imp_connection;
479         struct ptlrpc_request *request = NULL;
480         struct ptlrpc_bulk_desc *desc = NULL;
481         struct ost_body *body;
482         struct niobuf_local *local = NULL;
483         struct niobuf_remote *remote;
484         struct osc_brw_cb_data *cb_data = NULL;
485         int rc, j, size[3] = {sizeof(*body)};
486         void *iooptr, *nioptr;
487         int mapped = 0;
488         ENTRY;
489
490         size[1] = sizeof(struct obd_ioobj);
491         size[2] = page_count * sizeof(*remote);
492
493         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
494                                   NULL);
495         if (!request)
496                 RETURN(-ENOMEM);
497
498         body = lustre_msg_buf(request->rq_reqmsg, 0);
499
500         desc = ptlrpc_prep_bulk(connection);
501         if (!desc)
502                 GOTO(out_req, rc = -ENOMEM);
503         desc->bd_portal = OSC_BULK_PORTAL;
504         desc->bd_cb = brw_finish;
505         OBD_ALLOC(cb_data, sizeof(*cb_data));
506         if (!cb_data)
507                 GOTO(out_desc, rc = -ENOMEM);
508
509         cb_data->callback = callback;
510         cb_data->cb_data = data;
511         data->desc = desc;
512         desc->bd_cb_data = cb_data;
513
514         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
515         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
516         ost_pack_ioo(&iooptr, md, page_count);
517         /* end almost identical to brw_read case */
518
519         OBD_ALLOC(local, page_count * sizeof(*local));
520         if (!local)
521                 GOTO(out_cb, rc = -ENOMEM);
522
523         cb_data->obd_data = local;
524         cb_data->obd_size = page_count * sizeof(*local);
525
526         for (mapped = 0; mapped < page_count; mapped++) {
527                 local[mapped].addr = kmap(pga[mapped].pg);
528
529                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
530                        "%d ; page %d of %d\n",
531                        local[mapped].addr, pga[mapped].pg->flags,
532                        page_count(pga[mapped].pg),
533                        mapped, page_count - 1);
534
535                 local[mapped].offset = pga[mapped].off;
536                 local[mapped].len = pga[mapped].count;
537                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
538                                 pga[mapped].flag, 0);
539         }
540
541         size[1] = page_count * sizeof(*remote);
542         request->rq_replen = lustre_msg_size(2, size);
543         rc = ptlrpc_queue_wait(request);
544         rc = ptlrpc_check_status(request, rc);
545         if (rc)
546                 GOTO(out_unmap, rc);
547
548         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
549         if (!nioptr)
550                 GOTO(out_unmap, rc = -EINVAL);
551
552         if (request->rq_repmsg->buflens[1] != size[1]) {
553                 CERROR("buffer length wrong (%d vs. %d)\n",
554                        request->rq_repmsg->buflens[1], size[1]);
555                 GOTO(out_unmap, rc = -EINVAL);
556         }
557
558         for (j = 0; j < page_count; j++) {
559                 struct ptlrpc_bulk_page *bulk;
560
561                 ost_unpack_niobuf(&nioptr, &remote);
562
563                 bulk = ptlrpc_prep_bulk_page(desc);
564                 if (!bulk)
565                         GOTO(out_unmap, rc = -ENOMEM);
566
567                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
568                 bulk->bp_buflen = local[j].len;
569                 bulk->bp_xid = remote->xid;
570                 bulk->bp_page = pga[j].pg;
571         }
572
573         if (desc->bd_page_count != page_count)
574                 LBUG();
575
576         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
577                 GOTO(out_unmap, rc = 0);
578
579         /* Our reference is released when brw_finish is complete. */
580         rc = ptlrpc_send_bulk(desc);
581
582         /* XXX: Mike, same question as in osc_brw_read. */
583         if (rc)
584                 GOTO(out_req, rc);
585
586         /* Callbacks cause asynchronous handling. */
587         rc = callback(data, 0, CB_PHASE_START);
588
589 out_req:
590         ptlrpc_req_finished(request);
591         RETURN(rc);
592
593         /* Clean up on error. */
594 out_unmap:
595         while (mapped-- > 0)
596                 kunmap(pga[mapped].pg);
597
598         OBD_FREE(local, page_count * sizeof(*local));
599 out_cb:
600         OBD_FREE(cb_data, sizeof(*cb_data));
601 out_desc:
602         ptlrpc_bulk_decref(desc);
603         goto out_req;
604 }
605
606 static int osc_brw(int cmd, struct lustre_handle *conn,
607                    struct lov_stripe_md *md, obd_count page_count,
608                    struct brw_page *pga, brw_callback_t callback,
609                    struct io_cb_data *data)
610 {
611         if (cmd & OBD_BRW_WRITE)
612                 return osc_brw_write(conn, md, page_count, pga, callback, data);
613         else
614                 return osc_brw_read(conn, md, page_count, pga, callback, data);
615 }
616
617 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
618                        struct lustre_handle *parent_lock,
619                        __u32 type, void *extentp, int extent_len, __u32 mode,
620                        int *flags, void *callback, void *data, int datalen,
621                        struct lustre_handle *lockh)
622 {
623         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
624         struct obd_device *obddev = class_conn2obd(connh);
625         struct ldlm_extent *extent = extentp;
626         int rc;
627         ENTRY;
628
629         /* Filesystem locks are given a bit of special treatment: if
630          * this is not a file size lock (which has end == -1), we
631          * fixup the lock to start and end on page boundaries. */
632         if (extent->end != OBD_OBJECT_EOF) {
633                 extent->start &= PAGE_MASK;
634                 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
635         }
636
637         /* Next, search for already existing extent locks that will cover us */
638         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
639                              sizeof(extent), mode, lockh);
640         if (rc == 1)
641                 RETURN(0); /* We already have a lock, and it's referenced */
642
643         /* If we're trying to read, we also search for an existing PW lock.  The
644          * VFS and page cache already protect us locally, so lots of readers/
645          * writers can share a single PW lock.
646          *
647          * There are problems with conversion deadlocks, so instead of
648          * converting a read lock to a write lock, we'll just enqueue a new
649          * one.
650          *
651          * At some point we should cancel the read lock instead of making them
652          * send us a blocking callback, but there are problems with canceling
653          * locks out from other users right now, too. */
654
655         if (mode == LCK_PR) {
656                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
657                                      extent, sizeof(extent), LCK_PW, lockh);
658                 if (rc == 1) {
659                         /* FIXME: This is not incredibly elegant, but it might
660                          * be more elegant than adding another parameter to
661                          * lock_match.  I want a second opinion. */
662                         ldlm_lock_addref(lockh, LCK_PR);
663                         ldlm_lock_decref(lockh, LCK_PW);
664
665                         RETURN(0);
666                 }
667         }
668
669         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
670                               res_id, type, extent, sizeof(extent), mode, flags,
671                               ldlm_completion_ast, callback, data, datalen,
672                               lockh);
673         RETURN(rc);
674 }
675
676 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
677                       __u32 mode, struct lustre_handle *lockh)
678 {
679         ENTRY;
680
681         ldlm_lock_decref(lockh, mode);
682
683         RETURN(0);
684 }
685
686 static int osc_cancel_unused(struct lustre_handle *connh,
687                              struct lov_stripe_md *lsm, int local)
688 {
689         struct obd_device *obddev = class_conn2obd(connh);
690         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
691
692         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, local);
693 }
694
695 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
696 {
697         struct ptlrpc_request *request;
698         int rc, size = sizeof(*osfs);
699         ENTRY;
700
701         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
702                                   NULL);
703         if (!request)
704                 RETURN(-ENOMEM);
705
706         request->rq_replen = lustre_msg_size(1, &size);
707
708         rc = ptlrpc_queue_wait(request);
709         rc = ptlrpc_check_status(request, rc);
710         if (rc) {
711                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
712                 GOTO(out, rc);
713         }
714
715         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
716
717         EXIT;
718  out:
719         ptlrpc_req_finished(request);
720         return rc;
721 }
722
723 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
724                          void *karg, void *uarg)
725 {
726         struct obd_device *obddev = class_conn2obd(conn);
727         struct obd_ioctl_data *data = karg;
728         int err = 0;
729         ENTRY;
730
731         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
732             _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
733                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
734                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
735                 RETURN(-EINVAL);
736         }
737
738         switch (cmd) {
739         case IOC_LDLM_TEST: {
740                 err = ldlm_test(obddev, conn);
741                 CERROR("-- done err %d\n", err);
742                 GOTO(out, err);
743         }
744         case IOC_LDLM_REGRESS_START: {
745                 unsigned int numthreads = 1;
746                 unsigned int numheld = 10;
747                 unsigned int numres = 10;
748                 unsigned int numext = 10;
749                 char *parse;
750
751                 if (data->ioc_inllen1) {
752                         parse = data->ioc_inlbuf1;
753                         if (*parse != '\0') {
754                                 while(isspace(*parse)) parse++;
755                                 numthreads = simple_strtoul(parse, &parse, 0);
756                                 while(isspace(*parse)) parse++;
757                         }
758                         if (*parse != '\0') {
759                                 while(isspace(*parse)) parse++;
760                                 numheld = simple_strtoul(parse, &parse, 0);
761                                 while(isspace(*parse)) parse++;
762                         }
763                         if (*parse != '\0') {
764                                 while(isspace(*parse)) parse++;
765                                 numres = simple_strtoul(parse, &parse, 0);
766                                 while(isspace(*parse)) parse++;
767                         }
768                         if (*parse != '\0') {
769                                 while(isspace(*parse)) parse++;
770                                 numext = simple_strtoul(parse, &parse, 0);
771                                 while(isspace(*parse)) parse++;
772                         }
773                 }
774
775                 err = ldlm_regression_start(obddev, conn, numthreads,
776                                 numheld, numres, numext);
777
778                 CERROR("-- done err %d\n", err);
779                 GOTO(out, err);
780         }
781         case IOC_LDLM_REGRESS_STOP: {
782                 err = ldlm_regression_stop();
783                 CERROR("-- done err %d\n", err);
784                 GOTO(out, err);
785         }
786         default:
787                 GOTO(out, err = -EINVAL);
788         }
789 out:
790         return err;
791 }
792
793 struct obd_ops osc_obd_ops = {
794         o_setup:        client_obd_setup,
795         o_cleanup:      client_obd_cleanup,
796         o_statfs:       osc_statfs,
797         o_create:       osc_create,
798         o_destroy:      osc_destroy,
799         o_getattr:      osc_getattr,
800         o_setattr:      osc_setattr,
801         o_open:         osc_open,
802         o_close:        osc_close,
803         o_connect:      client_obd_connect,
804         o_disconnect:   client_obd_disconnect,
805         o_brw:          osc_brw,
806         o_punch:        osc_punch,
807         o_enqueue:      osc_enqueue,
808         o_cancel:       osc_cancel,
809         o_cancel_unused: osc_cancel_unused,
810         o_iocontrol:    osc_iocontrol
811 };
812
813 static int __init osc_init(void)
814 {
815         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
816 }
817
818 static void __exit osc_exit(void)
819 {
820         class_unregister_type(LUSTRE_OSC_NAME);
821 }
822
823 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
824 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
825 MODULE_LICENSE("GPL");
826
827 module_init(osc_init);
828 module_exit(osc_exit);