1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
30 #define DEBUG_SUBSYSTEM S_OSC
33 #include <linux/version.h>
34 #include <linux/module.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
42 #include <linux/locks.h>
45 #include <liblustre.h>
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/lustre_otree.h>
51 #include <linux/obd_ost.h>
52 #include <linux/obd_lov.h>
55 #include <linux/ctype.h>
56 #include <linux/init.h>
61 #include <linux/lustre_ha.h>
62 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
63 #include <linux/lustre_lite.h> /* for ll_i2info */
64 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
65 #include <linux/lprocfs_status.h>
67 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
69 struct lprocfs_static_vars lvars;
71 lprocfs_init_vars(&lvars);
72 return lprocfs_obd_attach(dev, lvars.obd_vars);
75 static int osc_detach(struct obd_device *dev)
77 return lprocfs_obd_detach(dev);
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
82 struct lov_stripe_md *lsm)
87 lmm_size = sizeof(**lmmp);
92 OBD_FREE(*lmmp, lmm_size);
98 OBD_ALLOC(*lmmp, lmm_size);
104 LASSERT(lsm->lsm_object_id);
105 (*lmmp)->lmm_object_id = cpu_to_le64 (lsm->lsm_object_id);
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
113 struct lov_mds_md *lmm, int lmm_bytes)
119 if (lmm_bytes < sizeof (*lmm)) {
120 CERROR("lov_mds_md too small: %d, need %d\n",
121 lmm_bytes, (int)sizeof(*lmm));
124 /* XXX LOV_MAGIC etc check? */
126 if (lmm->lmm_object_id == cpu_to_le64 (0)) {
127 CERROR ("lov_mds_md: zero lmm_object_id\n");
132 lsm_size = lov_stripe_md_size(1);
137 OBD_FREE(*lsmp, lsm_size);
143 OBD_ALLOC(*lsmp, lsm_size);
147 (*lsmp)->lsm_oinfo[0].loi_dirty_ot =
148 &(*lsmp)->lsm_oinfo[0].loi_dirty_ot_inline;
149 ot_init((*lsmp)->lsm_oinfo[0].loi_dirty_ot);
153 /* XXX zero *lsmp? */
154 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
155 LASSERT((*lsmp)->lsm_object_id);
158 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
163 #warning "FIXME: make this be sent from OST"
164 #define OSC_BRW_MAX_SIZE 65536
165 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
167 static int osc_getattr_interpret(struct ptlrpc_request *req,
168 struct osc_getattr_async_args *aa, int rc)
170 struct obdo *oa = aa->aa_oa;
171 struct ost_body *body;
175 CERROR("failed: rc = %d\n", rc);
179 body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
181 CERROR ("can't unpack ost_body\n");
185 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
186 memcpy(oa, &body->oa, sizeof(*oa));
188 /* This should really be sent by the OST */
189 oa->o_blksize = OSC_BRW_MAX_SIZE;
190 oa->o_valid |= OBD_MD_FLBLKSZ;
195 static int osc_getattr_async(struct lustre_handle *conn, struct obdo *oa,
196 struct lov_stripe_md *md,
197 struct ptlrpc_request_set *set)
199 struct ptlrpc_request *request;
200 struct ost_body *body;
201 int size = sizeof(*body);
202 struct osc_getattr_async_args *aa;
205 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
210 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
211 memcpy(&body->oa, oa, sizeof(*oa));
213 request->rq_replen = lustre_msg_size(1, &size);
214 request->rq_interpret_reply = osc_getattr_interpret;
216 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
217 aa = (struct osc_getattr_async_args *)&request->rq_async_args;
220 ptlrpc_set_add_req (set, request);
224 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
225 struct lov_stripe_md *md)
227 struct ptlrpc_request *request;
228 struct ost_body *body;
229 int rc, size = sizeof(*body);
232 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
237 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
238 memcpy(&body->oa, oa, sizeof(*oa));
240 request->rq_replen = lustre_msg_size(1, &size);
242 rc = ptlrpc_queue_wait(request);
244 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
248 body = lustre_swab_repbuf(request, 0, sizeof (*body),
249 lustre_swab_ost_body);
251 CERROR ("can't unpack ost_body\n");
252 GOTO (out, rc = -EPROTO);
255 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
256 memcpy(oa, &body->oa, sizeof(*oa));
258 /* This should really be sent by the OST */
259 oa->o_blksize = OSC_BRW_MAX_SIZE;
260 oa->o_valid |= OBD_MD_FLBLKSZ;
264 ptlrpc_req_finished(request);
268 /* The import lock must already be held. */
269 static inline void osc_update_body_handle(struct list_head *head,
270 struct lustre_handle *old,
271 struct lustre_handle *new, int op)
273 struct list_head *tmp;
274 struct ost_body *body;
275 struct ptlrpc_request *req;
276 struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
278 list_for_each(tmp, head) {
279 req = list_entry(tmp, struct ptlrpc_request, rq_list);
281 /* XXX ok to remove when bug 1303 resolved - rread 05/27/03 */
282 LASSERT (req != last_req);
285 if (req->rq_reqmsg->opc != op)
287 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
288 if (memcmp(obdo_handle(&body->oa), old, sizeof(*old)))
291 DEBUG_REQ(D_HA, req, "updating close body with new fh");
292 memcpy(obdo_handle(&body->oa), new, sizeof(*new));
296 static void osc_replay_open(struct ptlrpc_request *req)
298 struct lustre_handle old;
299 struct ost_body *body;
300 struct obd_client_handle *och = req->rq_replay_data;
301 struct lustre_handle *oa_handle;
304 body = lustre_swab_repbuf (req, 0, sizeof (*body),
305 lustre_swab_ost_body);
306 LASSERT (body != NULL);
308 oa_handle = obdo_handle(&body->oa);
310 memcpy(&old, &och->och_fh, sizeof(old));
311 CDEBUG(D_HA, "updating cookie from "LPD64" to "LPD64"\n",
312 och->och_fh.cookie, oa_handle->cookie);
313 memcpy(&och->och_fh, oa_handle, sizeof(och->och_fh));
315 /* A few frames up, ptlrpc_replay holds the lock, so this is safe. */
316 osc_update_body_handle(&req->rq_import->imp_sending_list, &old,
317 &och->och_fh, OST_CLOSE);
318 osc_update_body_handle(&req->rq_import->imp_delayed_list, &old,
319 &och->och_fh, OST_CLOSE);
324 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
325 struct lov_stripe_md *md, struct obd_trans_info *oti,
326 struct obd_client_handle *och)
328 struct ptlrpc_request *request;
329 struct ost_body *body;
331 int rc, size = sizeof(*body);
333 LASSERT(och != NULL);
335 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
340 spin_lock_irqsave (&request->rq_lock, flags);
341 request->rq_replay = 1;
342 spin_unlock_irqrestore (&request->rq_lock, flags);
344 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
345 memcpy(&body->oa, oa, sizeof(*oa));
347 request->rq_replen = lustre_msg_size(1, &size);
349 rc = ptlrpc_queue_wait(request);
353 body = lustre_swab_repbuf (request, 0, sizeof (*body),
354 lustre_swab_ost_body);
356 CERROR ("Can't unpack ost_body\n");
357 GOTO (out, rc = -EPROTO);
360 memcpy(oa, &body->oa, sizeof(*oa));
362 /* If the open succeeded, we better have a handle */
363 /* BlueArc OSTs don't send back (o_valid | FLHANDLE). sigh.
364 * Temporary workaround until fixed. -phil 24 Feb 03 */
365 // if ((oa->o_valid & OBD_MD_FLHANDLE) == 0) {
366 // CERROR ("No file handle\n");
367 // GOTO (out, rc = -EPROTO);
369 oa->o_valid |= OBD_MD_FLHANDLE;
371 /* This should really be sent by the OST */
372 oa->o_blksize = OSC_BRW_MAX_SIZE;
373 oa->o_valid |= OBD_MD_FLBLKSZ;
375 memcpy(&och->och_fh, obdo_handle(oa), sizeof(och->och_fh));
376 request->rq_replay_cb = osc_replay_open;
377 request->rq_replay_data = och;
378 och->och_req = ptlrpc_request_addref(request);
379 och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
383 ptlrpc_req_finished(request);
387 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
388 struct lov_stripe_md *md, struct obd_trans_info *oti)
390 struct obd_import *import = class_conn2cliimp(conn);
391 struct ptlrpc_request *request;
392 struct ost_body *body;
393 struct obd_client_handle *och;
395 int rc, size = sizeof(*body);
399 och = (struct obd_client_handle *)&oa->o_inline;
400 if (och->och_magic == 0) {
401 /* Zero magic means that this file was never opened on this
402 * OST--almost certainly because the OST was inactive at
406 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
408 request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
412 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
413 memcpy(&body->oa, oa, sizeof(*oa));
415 request->rq_replen = lustre_msg_size(1, &size);
417 rc = ptlrpc_queue_wait(request);
419 CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
421 /* och_req == NULL can't happen any more, right? --phik */
422 if (och->och_req != NULL) {
423 spin_lock_irqsave(&import->imp_lock, flags);
424 spin_lock (&och->och_req->rq_lock);
425 och->och_req->rq_replay = 0;
426 spin_unlock (&och->och_req->rq_lock);
427 /* see comments in llite/file.c:ll_mdc_close() */
428 if (och->och_req->rq_transno) {
429 /* this can't happen yet, because the OSTs don't yet
430 * issue transnos for OPEN requests -phik 21 Apr 2003 */
432 if (!request->rq_transno && import->imp_replayable) {
433 request->rq_transno = och->och_req->rq_transno;
434 ptlrpc_retain_replayable_request(request,
437 spin_unlock_irqrestore(&import->imp_lock, flags);
439 spin_unlock_irqrestore(&import->imp_lock, flags);
442 ptlrpc_req_finished(och->och_req);
446 body = lustre_swab_repbuf (request, 0, sizeof (*body),
447 lustre_swab_ost_body);
450 CDEBUG(D_HA, "Suppressing close error %d\n", rc); // bug 1036
452 memcpy(oa, &body->oa, sizeof(*oa));
455 ptlrpc_req_finished(request);
459 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
460 struct lov_stripe_md *md, struct obd_trans_info *oti)
462 struct ptlrpc_request *request;
463 struct ost_body *body;
464 int rc, size = sizeof(*body);
467 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
472 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
473 memcpy(&body->oa, oa, sizeof(*oa));
475 request->rq_replen = lustre_msg_size(1, &size);
477 rc = ptlrpc_queue_wait(request);
479 ptlrpc_req_finished(request);
483 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
484 struct lov_stripe_md **ea, struct obd_trans_info *oti)
486 struct ptlrpc_request *request;
487 struct ost_body *body;
488 struct lov_stripe_md *lsm;
489 int rc, size = sizeof(*body);
497 rc = obd_alloc_memmd(conn, &lsm);
502 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
505 GOTO(out, rc = -ENOMEM);
507 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
508 memcpy(&body->oa, oa, sizeof(*oa));
510 request->rq_replen = lustre_msg_size(1, &size);
512 rc = ptlrpc_queue_wait(request);
516 body = lustre_swab_repbuf (request, 0, sizeof (*body),
517 lustre_swab_ost_body);
519 CERROR ("can't unpack ost_body\n");
520 GOTO (out_req, rc = -EPROTO);
523 memcpy(oa, &body->oa, sizeof(*oa));
525 /* This should really be sent by the OST */
526 oa->o_blksize = OSC_BRW_MAX_SIZE;
527 oa->o_valid |= OBD_MD_FLBLKSZ;
529 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
530 * have valid lsm_oinfo data structs, so don't go touching that.
531 * This needs to be fixed in a big way.
533 lsm->lsm_object_id = oa->o_id;
534 lsm->lsm_stripe_count = 0;
535 lsm->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
539 oti->oti_transno = request->rq_repmsg->transno;
541 CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
544 ptlrpc_req_finished(request);
547 obd_free_memmd(conn, &lsm);
551 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
552 struct lov_stripe_md *md, obd_size start,
553 obd_size end, struct obd_trans_info *oti)
555 struct ptlrpc_request *request;
556 struct ost_body *body;
557 int rc, size = sizeof(*body);
565 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
570 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
571 memcpy(&body->oa, oa, sizeof(*oa));
573 /* overload the size and blocks fields in the oa with start/end */
574 body->oa.o_size = start;
575 body->oa.o_blocks = end;
576 body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
578 request->rq_replen = lustre_msg_size(1, &size);
580 rc = ptlrpc_queue_wait(request);
584 body = lustre_swab_repbuf (request, 0, sizeof (*body),
585 lustre_swab_ost_body);
587 CERROR ("can't unpack ost_body\n");
588 GOTO (out, rc = -EPROTO);
591 memcpy(oa, &body->oa, sizeof(*oa));
595 ptlrpc_req_finished(request);
599 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
600 struct lov_stripe_md *ea, struct obd_trans_info *oti)
602 struct ptlrpc_request *request;
603 struct ost_body *body;
604 int rc, size = sizeof(*body);
611 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
616 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
617 memcpy(&body->oa, oa, sizeof(*oa));
619 request->rq_replen = lustre_msg_size(1, &size);
621 rc = ptlrpc_queue_wait(request);
625 body = lustre_swab_repbuf (request, 0, sizeof (*body),
626 lustre_swab_ost_body);
628 CERROR ("Can't unpack body\n");
629 GOTO (out, rc = -EPROTO);
632 memcpy(oa, &body->oa, sizeof(*oa));
636 ptlrpc_req_finished(request);
640 static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
642 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
644 LASSERT(!(body->oa.o_valid & bits));
646 body->oa.o_valid |= bits;
647 down(&cli->cl_dirty_sem);
648 body->oa.o_blocks = cli->cl_dirty;
649 body->oa.o_rdev = cli->cl_dirty_granted;
650 up(&cli->cl_dirty_sem);
651 CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
652 cli->cl_dirty, cli->cl_dirty_granted);
655 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
657 if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
658 if (cli->cl_ost_can_grant) {
659 CDEBUG(D_INODE, "%s can't grant\n",
660 cli->cl_import->imp_target_uuid.uuid);
662 cli->cl_ost_can_grant = 0;
666 CDEBUG(D_INODE, "got "LPU64" grant\n", body->oa.o_rdev);
667 down(&cli->cl_dirty_sem);
668 cli->cl_dirty_granted = body->oa.o_rdev;
669 /* XXX check for over-run and wake up the io thread that
670 * doesn't exist yet */
671 up(&cli->cl_dirty_sem);
674 /* We assume that the reason this OSC got a short read is because it read
675 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
676 * via the LOV, and it _knows_ it's reading inside the file, it's just that
677 * this stripe never got written at or beyond this stripe offset yet. */
678 static void handle_short_read(int nob_read, obd_count page_count,
679 struct brw_page *pga)
683 /* skip bytes read OK */
684 while (nob_read > 0) {
685 LASSERT (page_count > 0);
687 if (pga->count > nob_read) {
688 /* EOF inside this page */
689 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
690 memset(ptr + nob_read, 0, pga->count - nob_read);
697 nob_read -= pga->count;
702 /* zero remaining pages */
703 while (page_count-- > 0) {
704 ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
705 memset(ptr, 0, pga->count);
711 static int check_write_rcs (struct ptlrpc_request *request,
712 int niocount, obd_count page_count,
713 struct brw_page *pga)
718 /* return error if any niobuf was in error */
719 remote_rcs = lustre_swab_repbuf(request, 1,
720 sizeof(*remote_rcs) * niocount, NULL);
721 if (remote_rcs == NULL) {
722 CERROR ("Missing/short RC vector on BRW_WRITE reply\n");
725 if (lustre_msg_swabbed (request->rq_repmsg))
726 for (i = 0; i < niocount; i++)
727 __swab32s (&remote_rcs[i]);
729 for (i = 0; i < niocount; i++) {
730 if (remote_rcs[i] < 0)
731 return (remote_rcs[i]);
733 if (remote_rcs[i] != 0) {
734 CERROR ("rc[%d] invalid (%d) req %p\n",
735 i, remote_rcs[i], request);
743 static inline int can_merge_pages (struct brw_page *p1, struct brw_page *p2)
745 if (p1->flag != p2->flag) {
746 /* XXX we don't make much use of 'flag' right now
747 * but this will warn about usage when we do */
748 CERROR ("different flags set %d, %d\n",
753 return (p1->off + p1->count == p2->off);
757 static obd_count cksum_pages(int nob, obd_count page_count,
758 struct brw_page *pga)
765 LASSERT (page_count > 0);
767 ptr = kmap (pga->pg);
768 ost_checksum (&cksum, ptr + (pga->off & (PAGE_SIZE - 1)),
769 pga->count > nob ? nob : pga->count);
781 static int osc_brw_prep_request(struct obd_import *imp,
782 struct lov_stripe_md *lsm, obd_count page_count,
783 struct brw_page *pga, int cmd,
784 int *requested_nobp, int *niocountp,
785 struct ptlrpc_request **reqp)
787 struct ptlrpc_request *req;
788 struct ptlrpc_bulk_desc *desc;
789 struct client_obd *cli = &imp->imp_obd->u.cli;
790 struct ost_body *body;
791 struct obd_ioobj *ioobj;
792 struct niobuf_remote *niobuf;
801 opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
803 for (niocount = i = 1; i < page_count; i++)
804 if (!can_merge_pages (&pga[i - 1], &pga[i]))
807 size[0] = sizeof (*body);
808 size[1] = sizeof (*ioobj);
809 size[2] = niocount * sizeof (*niobuf);
811 req = ptlrpc_prep_req (imp, opc, 3, size, NULL);
815 if (opc == OST_WRITE)
816 desc = ptlrpc_prep_bulk_imp(req, BULK_GET_SOURCE,
819 desc = ptlrpc_prep_bulk_imp(req, BULK_PUT_SINK,
822 GOTO (out, rc = -ENOMEM);
823 /* NB request now owns desc and will free it when it gets freed */
825 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
826 ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
827 niobuf = lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf));
829 ioobj->ioo_id = lsm->lsm_object_id;
831 ioobj->ioo_type = S_IFREG;
832 ioobj->ioo_bufcnt = niocount;
834 LASSERT (page_count > 0);
835 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
836 struct brw_page *pg = &pga[i];
837 struct brw_page *pg_prev = pg - 1;
839 LASSERT (pg->count > 0);
840 LASSERT ((pg->off & (PAGE_SIZE - 1)) + pg->count <= PAGE_SIZE);
841 LASSERT (i == 0 || pg->off > pg_prev->off);
843 rc = ptlrpc_prep_bulk_page (desc, pg->pg,
844 pg->off & (PAGE_SIZE - 1),
849 requested_nob += pg->count;
851 if (i > 0 && can_merge_pages (pg_prev, pg)) {
853 niobuf->len += pg->count;
855 niobuf->offset = pg->off;
856 niobuf->len = pg->count;
857 niobuf->flags = pg->flag;
861 LASSERT ((void *)(niobuf - niocount) ==
862 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
864 body->oa.o_valid |= OBD_MD_FLCKSUM;
865 if (opc == OST_BRW_WRITE)
866 body->oa.o_nlink = cksum_pages (requested_nob, page_count, pga);
868 osc_announce_cached(cli, body);
869 spin_lock_irqsave (&req->rq_lock, flags);
870 req->rq_no_resend = 1;
871 spin_unlock_irqrestore (&req->rq_lock, flags);
873 /* size[0] still sizeof (*body) */
874 if (opc == OST_WRITE) {
875 /* 1 RC per niobuf */
876 size[1] = sizeof(__u32) * niocount;
877 req->rq_replen = lustre_msg_size(2, size);
879 /* 1 RC for the whole I/O */
880 req->rq_replen = lustre_msg_size(1, size);
883 *niocountp = niocount;
884 *requested_nobp = requested_nob;
889 ptlrpc_req_finished (req);
893 static int osc_brw_fini_request (struct ptlrpc_request *req,
894 int requested_nob, int niocount,
895 obd_count page_count, struct brw_page *pga,
898 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
899 struct ost_body *body;
903 body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
905 CERROR ("Can't unpack body\n");
908 osc_update_grant(cli, body);
910 if (req->rq_reqmsg->opc == OST_WRITE) {
912 CERROR ("Unexpected +ve rc %d\n", rc);
916 return (check_write_rcs(req, niocount, page_count, pga));
919 if (rc > requested_nob) {
920 CERROR ("Unexpected rc %d (%d requested)\n",
925 if (rc < requested_nob)
926 handle_short_read(rc, page_count, pga);
929 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
930 static int cksum_counter;
931 obd_count server_cksum = body->oa.o_nlink;
932 obd_count cksum = cksum_pages(rc, page_count, pga);
935 if (server_cksum != cksum) {
936 CERROR("Bad checksum: server "LPX64", client "LPX64
937 ", server NID "LPX64"\n", server_cksum, cksum,
938 imp->imp_connection->c_peer.peer_nid);
940 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
941 CERROR("Checksum %u from "LPX64" OK: %x\n",
943 imp->imp_connection->c_peer.peer_nid, cksum);
945 static int cksum_missed;
947 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
948 CERROR("Request checksum %u from "LPX64", no reply\n",
950 imp->imp_connection->c_peer.peer_nid);
956 static int osc_brw_internal(struct lustre_handle *conn,
957 struct lov_stripe_md *lsm,
958 obd_count page_count, struct brw_page *pga, int cmd)
962 struct ptlrpc_request *request;
967 rc = osc_brw_prep_request(class_conn2cliimp(conn), lsm, page_count, pga,
968 cmd, &requested_nob, &niocount, &request);
969 /* NB ^ sets rq_no_resend */
974 rc = ptlrpc_queue_wait(request);
976 if (rc == -ETIMEDOUT && request->rq_resend) {
977 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
978 ptlrpc_req_finished(request);
982 rc = osc_brw_fini_request (request, requested_nob, niocount,
983 page_count, pga, rc);
985 ptlrpc_req_finished(request);
989 static int brw_interpret(struct ptlrpc_request *request,
990 struct osc_brw_async_args *aa, int rc)
992 int requested_nob = aa->aa_requested_nob;
993 int niocount = aa->aa_nio_count;
994 obd_count page_count = aa->aa_page_count;
995 struct brw_page *pga = aa->aa_pga;
998 /* XXX bug 937 here */
999 if (rc == -ETIMEDOUT && request->rq_resend) {
1000 DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
1001 LBUG(); /* re-send. later. */
1002 //goto restart_bulk;
1005 rc = osc_brw_fini_request (request, requested_nob, niocount,
1006 page_count, pga, rc);
1010 static int async_internal(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1011 obd_count page_count, struct brw_page *pga,
1012 struct ptlrpc_request_set *set, int cmd)
1014 struct ptlrpc_request *request;
1017 struct osc_brw_async_args *aa;
1021 rc = osc_brw_prep_request (class_conn2cliimp(conn),
1022 lsm, page_count, pga, cmd,
1023 &requested_nob, &nio_count, &request);
1024 /* NB ^ sets rq_no_resend */
1027 LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
1028 aa = (struct osc_brw_async_args *)&request->rq_async_args;
1029 aa->aa_requested_nob = requested_nob;
1030 aa->aa_nio_count = nio_count;
1031 aa->aa_page_count = page_count;
1034 request->rq_interpret_reply = brw_interpret;
1035 ptlrpc_set_add_req(set, request);
1041 #define min_t(type,x,y) \
1042 ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
1046 * ugh, we want disk allocation on the target to happen in offset order. we'll
1047 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1048 * fine for our small page arrays and doesn't require allocation. its an
1049 * insertion sort that swaps elements that are strides apart, shrinking the
1050 * stride down until its '1' and the array is sorted.
1052 static void sort_brw_pages(struct brw_page *array, int num)
1055 struct brw_page tmp;
1059 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1064 for (i = stride ; i < num ; i++) {
1067 while (j >= stride && array[j - stride].off > tmp.off) {
1068 array[j] = array[j - stride];
1073 } while (stride > 1);
1076 /* make sure we the regions we're passing to elan don't violate its '4
1077 * fragments' constraint. portal headers are a fragment, all full
1078 * PAGE_SIZE long pages count as 1 fragment, and each partial page
1079 * counts as a fragment. I think. see bug 934. */
1080 static obd_count check_elan_limit(struct brw_page *pg, obd_count pages)
1083 int saw_whole_frag = 0;
1086 for (i = 0 ; frags_left && i < pages ; pg++, i++) {
1087 if (pg->count == PAGE_SIZE) {
1088 if (!saw_whole_frag) {
1099 static int osc_brw(int cmd, struct lustre_handle *conn,
1100 struct lov_stripe_md *md, obd_count page_count,
1101 struct brw_page *pga, struct obd_trans_info *oti)
1105 if (cmd == OBD_BRW_CHECK) {
1106 /* The caller just wants to know if there's a chance that this
1107 * I/O can succeed */
1108 struct obd_import *imp = class_conn2cliimp(conn);
1110 if (imp == NULL || imp->imp_invalid)
1115 while (page_count) {
1116 obd_count pages_per_brw;
1119 if (page_count > OSC_BRW_MAX_IOV)
1120 pages_per_brw = OSC_BRW_MAX_IOV;
1122 pages_per_brw = page_count;
1124 sort_brw_pages(pga, pages_per_brw);
1125 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1127 rc = osc_brw_internal(conn, md, pages_per_brw, pga, cmd);
1132 page_count -= pages_per_brw;
1133 pga += pages_per_brw;
1138 static int osc_brw_async(int cmd, struct lustre_handle *conn,
1139 struct lov_stripe_md *md, obd_count page_count,
1140 struct brw_page *pga, struct ptlrpc_request_set *set,
1141 struct obd_trans_info *oti)
1145 if (cmd == OBD_BRW_CHECK) {
1146 /* The caller just wants to know if there's a chance that this
1147 * I/O can succeed */
1148 struct obd_import *imp = class_conn2cliimp(conn);
1150 if (imp == NULL || imp->imp_invalid)
1155 while (page_count) {
1156 obd_count pages_per_brw;
1159 if (page_count > OSC_BRW_MAX_IOV)
1160 pages_per_brw = OSC_BRW_MAX_IOV;
1162 pages_per_brw = page_count;
1164 sort_brw_pages(pga, pages_per_brw);
1165 pages_per_brw = check_elan_limit(pga, pages_per_brw);
1167 rc = async_internal(conn, md, pages_per_brw, pga, set, cmd);
1172 page_count -= pages_per_brw;
1173 pga += pages_per_brw;
1179 /* Note: caller will lock/unlock, and set uptodate on the pages */
1180 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1181 static int sanosc_brw_read(struct lustre_handle *conn,
1182 struct lov_stripe_md *lsm,
1183 obd_count page_count,
1184 struct brw_page *pga)
1186 struct ptlrpc_request *request = NULL;
1187 struct ost_body *body;
1188 struct niobuf_remote *nioptr;
1189 struct obd_ioobj *iooptr;
1190 int rc, size[3] = {sizeof(*body)}, mapped = 0;
1194 /* XXX does not handle 'new' brw protocol */
1196 size[1] = sizeof(struct obd_ioobj);
1197 size[2] = page_count * sizeof(*nioptr);
1199 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
1204 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1205 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1206 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1207 sizeof (*nioptr) * page_count);
1209 iooptr->ioo_id = lsm->lsm_object_id;
1211 iooptr->ioo_type = S_IFREG;
1212 iooptr->ioo_bufcnt = page_count;
1214 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1215 LASSERT(PageLocked(pga[mapped].pg));
1216 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1218 nioptr->offset = pga[mapped].off;
1219 nioptr->len = pga[mapped].count;
1220 nioptr->flags = pga[mapped].flag;
1223 size[1] = page_count * sizeof(*nioptr);
1224 request->rq_replen = lustre_msg_size(2, size);
1226 rc = ptlrpc_queue_wait(request);
1230 swab = lustre_msg_swabbed (request->rq_repmsg);
1231 LASSERT_REPSWAB (request, 1);
1232 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1234 /* nioptr missing or short */
1235 GOTO(out_req, rc = -EPROTO);
1239 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1240 struct page *page = pga[mapped].pg;
1241 struct buffer_head *bh;
1245 lustre_swab_niobuf_remote (nioptr);
1247 /* got san device associated */
1248 LASSERT(class_conn2obd(conn));
1249 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1252 if (!nioptr->offset) {
1253 CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
1254 page->mapping->host->i_ino,
1256 memset(page_address(page), 0, PAGE_SIZE);
1260 if (!page->buffers) {
1261 create_empty_buffers(page, dev, PAGE_SIZE);
1264 clear_bit(BH_New, &bh->b_state);
1265 set_bit(BH_Mapped, &bh->b_state);
1266 bh->b_blocknr = (unsigned long)nioptr->offset;
1268 clear_bit(BH_Uptodate, &bh->b_state);
1270 ll_rw_block(READ, 1, &bh);
1274 /* if buffer already existed, it must be the
1275 * one we mapped before, check it */
1276 LASSERT(!test_bit(BH_New, &bh->b_state));
1277 LASSERT(test_bit(BH_Mapped, &bh->b_state));
1278 LASSERT(bh->b_blocknr == (unsigned long)nioptr->offset);
1280 /* wait it's io completion */
1281 if (test_bit(BH_Lock, &bh->b_state))
1284 if (!test_bit(BH_Uptodate, &bh->b_state))
1285 ll_rw_block(READ, 1, &bh);
1289 /* must do syncronous write here */
1291 if (!buffer_uptodate(bh)) {
1299 ptlrpc_req_finished(request);
1303 static int sanosc_brw_write(struct lustre_handle *conn,
1304 struct lov_stripe_md *lsm,
1305 obd_count page_count,
1306 struct brw_page *pga)
1308 struct ptlrpc_request *request = NULL;
1309 struct ost_body *body;
1310 struct niobuf_remote *nioptr;
1311 struct obd_ioobj *iooptr;
1312 int rc, size[3] = {sizeof(*body)}, mapped = 0;
1316 size[1] = sizeof(struct obd_ioobj);
1317 size[2] = page_count * sizeof(*nioptr);
1319 request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
1324 body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1325 iooptr = lustre_msg_buf(request->rq_reqmsg, 1, sizeof (*iooptr));
1326 nioptr = lustre_msg_buf(request->rq_reqmsg, 2,
1327 sizeof (*nioptr) * page_count);
1329 iooptr->ioo_id = lsm->lsm_object_id;
1331 iooptr->ioo_type = S_IFREG;
1332 iooptr->ioo_bufcnt = page_count;
1335 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1336 LASSERT(PageLocked(pga[mapped].pg));
1337 LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off);
1339 nioptr->offset = pga[mapped].off;
1340 nioptr->len = pga[mapped].count;
1341 nioptr->flags = pga[mapped].flag;
1344 size[1] = page_count * sizeof(*nioptr);
1345 request->rq_replen = lustre_msg_size(2, size);
1347 rc = ptlrpc_queue_wait(request);
1351 swab = lustre_msg_swabbed (request->rq_repmsg);
1352 LASSERT_REPSWAB (request, 1);
1353 nioptr = lustre_msg_buf(request->rq_repmsg, 1, size[1]);
1355 CERROR("absent/short niobuf array\n");
1356 GOTO(out_req, rc = -EPROTO);
1360 for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
1361 struct page *page = pga[mapped].pg;
1362 struct buffer_head *bh;
1366 lustre_swab_niobuf_remote (nioptr);
1368 /* got san device associated */
1369 LASSERT(class_conn2obd(conn));
1370 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1372 if (!page->buffers) {
1373 create_empty_buffers(page, dev, PAGE_SIZE);
1376 LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1377 LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1378 LASSERT(page->buffers->b_blocknr ==
1379 (unsigned long)nioptr->offset);
1385 /* if buffer locked, wait it's io completion */
1386 if (test_bit(BH_Lock, &bh->b_state))
1389 clear_bit(BH_New, &bh->b_state);
1390 set_bit(BH_Mapped, &bh->b_state);
1392 /* override the block nr */
1393 bh->b_blocknr = (unsigned long)nioptr->offset;
1395 /* we are about to write it, so set it
1397 * page lock should garentee no race condition here */
1398 set_bit(BH_Uptodate, &bh->b_state);
1399 set_bit(BH_Dirty, &bh->b_state);
1401 ll_rw_block(WRITE, 1, &bh);
1403 /* must do syncronous write here */
1405 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1413 ptlrpc_req_finished(request);
1417 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1418 struct lov_stripe_md *lsm, obd_count page_count,
1419 struct brw_page *pga, struct obd_trans_info *oti)
1423 while (page_count) {
1424 obd_count pages_per_brw;
1427 if (page_count > OSC_BRW_MAX_IOV)
1428 pages_per_brw = OSC_BRW_MAX_IOV;
1430 pages_per_brw = page_count;
1432 if (cmd & OBD_BRW_WRITE)
1433 rc = sanosc_brw_write(conn, lsm, pages_per_brw, pga);
1435 rc = sanosc_brw_read(conn, lsm, pages_per_brw, pga);
1440 page_count -= pages_per_brw;
1441 pga += pages_per_brw;
1448 static int osc_mark_page_dirty(struct lustre_handle *conn,
1449 struct lov_stripe_md *lsm, unsigned long offset)
1451 struct client_obd *cli = &class_conn2obd(conn)->u.cli;
1452 struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
1456 down(&cli->cl_dirty_sem);
1458 if (cli->cl_ost_can_grant &&
1459 (cli->cl_dirty + PAGE_CACHE_SIZE >= cli->cl_dirty_granted)) {
1460 CDEBUG(D_INODE, "granted "LPU64" < "LPU64"\n",
1461 cli->cl_dirty_granted, cli->cl_dirty + PAGE_CACHE_SIZE);
1462 GOTO(out, rc = -EDQUOT);
1465 rc = ot_mark_offset(dirty_ot, offset);
1469 cli->cl_dirty += PAGE_CACHE_SIZE;
1470 CDEBUG(D_INODE, "dirtied off %lu, now "LPU64" bytes dirty\n",
1471 offset, cli->cl_dirty);
1473 up(&cli->cl_dirty_sem);
1477 static int osc_clear_dirty_pages(struct lustre_handle *conn,
1478 struct lov_stripe_md *lsm,
1479 unsigned long start, unsigned long end,
1480 unsigned long *cleared)
1482 struct client_obd *cli = &class_conn2obd(conn)->u.cli;
1483 struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
1484 unsigned long old_marked, new_marked;
1488 down(&cli->cl_dirty_sem);
1490 old_marked = ot_num_marked(dirty_ot);
1492 rc = ot_clear_extent(dirty_ot, start, end);
1496 new_marked = ot_num_marked(dirty_ot);
1498 LASSERT(new_marked <= old_marked);
1499 LASSERT(old_marked * PAGE_CACHE_SIZE <= cli->cl_dirty);
1500 *cleared = old_marked - new_marked;
1501 cli->cl_dirty -= (__u64)*cleared << PAGE_CACHE_SHIFT;
1502 CDEBUG(D_INODE, "cleared [%lu,%lu], now "LPU64" bytes dirty\n",
1503 start, end, cli->cl_dirty);
1506 up(&cli->cl_dirty_sem);
1510 static int osc_last_dirty_offset(struct lustre_handle *conn,
1511 struct lov_stripe_md *lsm,
1512 unsigned long *offset)
1514 struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
1518 rc = ot_last_marked(dirty_ot, offset);
1522 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1523 struct lustre_handle *parent_lock,
1524 __u32 type, void *extentp, int extent_len, __u32 mode,
1525 int *flags, void *callback, void *data,
1526 struct lustre_handle *lockh)
1528 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1529 struct obd_device *obddev = class_conn2obd(connh);
1530 struct ldlm_extent *extent = extentp;
1534 /* Filesystem lock extents are extended to page boundaries so that
1535 * dealing with the page cache is a little smoother. */
1536 extent->start -= extent->start & ~PAGE_MASK;
1537 extent->end |= ~PAGE_MASK;
1539 /* Next, search for already existing extent locks that will cover us */
1540 rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA, &res_id,
1541 type, extent, sizeof(extent), mode, data, lockh);
1543 /* We already have a lock, and it's referenced */
1546 /* If we're trying to read, we also search for an existing PW lock. The
1547 * VFS and page cache already protect us locally, so lots of readers/
1548 * writers can share a single PW lock.
1550 * There are problems with conversion deadlocks, so instead of
1551 * converting a read lock to a write lock, we'll just enqueue a new
1554 * At some point we should cancel the read lock instead of making them
1555 * send us a blocking callback, but there are problems with canceling
1556 * locks out from other users right now, too. */
1558 if (mode == LCK_PR) {
1559 rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_MATCH_DATA,
1560 &res_id, type, extent, sizeof(extent),
1561 LCK_PW, data, lockh);
1563 /* FIXME: This is not incredibly elegant, but it might
1564 * be more elegant than adding another parameter to
1565 * lock_match. I want a second opinion. */
1566 ldlm_lock_addref(lockh, LCK_PR);
1567 ldlm_lock_decref(lockh, LCK_PW);
1573 rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1574 res_id, type, extent, sizeof(extent), mode, flags,
1575 ldlm_completion_ast, callback, data, lockh);
1579 static int osc_match(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1580 __u32 type, void *extentp, int extent_len, __u32 mode,
1581 int *flags, void *data, struct lustre_handle *lockh)
1583 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1584 struct obd_device *obddev = class_conn2obd(connh);
1585 struct ldlm_extent *extent = extentp;
1589 /* Filesystem lock extents are extended to page boundaries so that
1590 * dealing with the page cache is a little smoother */
1591 extent->start -= extent->start & ~PAGE_MASK;
1592 extent->end |= ~PAGE_MASK;
1594 /* Next, search for already existing extent locks that will cover us */
1595 rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id, type,
1596 extent, sizeof(extent), mode, data, lockh);
1600 /* If we're trying to read, we also search for an existing PW lock. The
1601 * VFS and page cache already protect us locally, so lots of readers/
1602 * writers can share a single PW lock. */
1603 if (mode == LCK_PR) {
1604 rc = ldlm_lock_match(obddev->obd_namespace, *flags, &res_id,
1605 type, extent, sizeof(extent), LCK_PW,
1608 /* FIXME: This is not incredibly elegant, but it might
1609 * be more elegant than adding another parameter to
1610 * lock_match. I want a second opinion. */
1611 ldlm_lock_addref(lockh, LCK_PR);
1612 ldlm_lock_decref(lockh, LCK_PW);
1618 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1619 __u32 mode, struct lustre_handle *lockh)
1623 ldlm_lock_decref(lockh, mode);
1628 static int osc_cancel_unused(struct lustre_handle *connh,
1629 struct lov_stripe_md *lsm, int flags, void *opaque)
1631 struct obd_device *obddev = class_conn2obd(connh);
1632 struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1634 return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
1638 static int osc_statfs(struct obd_export *exp, struct obd_statfs *osfs)
1640 struct obd_statfs *msfs;
1641 struct ptlrpc_request *request;
1642 int rc, size = sizeof(*osfs);
1645 request = ptlrpc_prep_req(exp->exp_obd->u.cli.cl_import, OST_STATFS, 0,
1650 request->rq_replen = lustre_msg_size(1, &size);
1652 rc = ptlrpc_queue_wait(request);
1654 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1658 msfs = lustre_swab_repbuf (request, 0, sizeof (*msfs),
1659 lustre_swab_obd_statfs);
1661 CERROR ("Can't unpack obd_statfs\n");
1662 GOTO (out, rc = -EPROTO);
1665 memcpy (osfs, msfs, sizeof (*msfs));
1669 ptlrpc_req_finished(request);
1673 /* Retrieve object striping information.
1675 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1676 * the maximum number of OST indices which will fit in the user buffer.
1677 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1679 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1680 struct lov_mds_md *lmmu)
1682 struct lov_mds_md lmm, *lmmk;
1689 rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1693 if (lmm.lmm_magic != LOV_MAGIC)
1696 if (lmm.lmm_ost_count < 1)
1699 lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1700 OBD_ALLOC(lmmk, lmm_size);
1704 lmmk->lmm_stripe_count = 1;
1705 lmmk->lmm_ost_count = 1;
1706 lmmk->lmm_object_id = lsm->lsm_object_id;
1707 lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1709 if (copy_to_user(lmmu, lmmk, lmm_size))
1712 OBD_FREE(lmmk, lmm_size);
1717 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1718 void *karg, void *uarg)
1720 struct obd_device *obddev = class_conn2obd(conn);
1721 struct obd_ioctl_data *data = karg;
1726 case IOC_OSC_REGISTER_LOV: {
1727 if (obddev->u.cli.cl_containing_lov)
1728 GOTO(out, err = -EALREADY);
1729 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1732 case OBD_IOC_LOV_GET_CONFIG: {
1734 struct lov_desc *desc;
1735 struct obd_uuid uuid;
1739 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1740 GOTO(out, err = -EINVAL);
1742 data = (struct obd_ioctl_data *)buf;
1744 if (sizeof(*desc) > data->ioc_inllen1) {
1746 GOTO(out, err = -EINVAL);
1749 if (data->ioc_inllen2 < sizeof(uuid)) {
1751 GOTO(out, err = -EINVAL);
1754 desc = (struct lov_desc *)data->ioc_inlbuf1;
1755 desc->ld_tgt_count = 1;
1756 desc->ld_active_tgt_count = 1;
1757 desc->ld_default_stripe_count = 1;
1758 desc->ld_default_stripe_size = 0;
1759 desc->ld_default_stripe_offset = 0;
1760 desc->ld_pattern = 0;
1761 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1763 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1765 err = copy_to_user((void *)uarg, buf, len);
1768 obd_ioctl_freedata(buf, len);
1771 case LL_IOC_LOV_SETSTRIPE:
1772 err = obd_alloc_memmd(conn, karg);
1776 case LL_IOC_LOV_GETSTRIPE:
1777 err = osc_getstripe(conn, karg, uarg);
1779 case OBD_IOC_CLIENT_RECOVER:
1780 err = ptlrpc_recover_import(obddev->u.cli.cl_import,
1783 case IOC_OSC_SET_ACTIVE:
1784 err = ptlrpc_set_import_active(obddev->u.cli.cl_import,
1788 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1789 GOTO(out, err = -ENOTTY);
1795 static int osc_get_info(struct lustre_handle *conn, obd_count keylen,
1796 void *key, __u32 *vallen, void *val)
1799 if (!vallen || !val)
1802 if (keylen > strlen("lock_to_stripe") &&
1803 strcmp(key, "lock_to_stripe") == 0) {
1804 __u32 *stripe = val;
1805 *vallen = sizeof(*stripe);
1812 struct obd_ops osc_obd_ops = {
1813 o_owner: THIS_MODULE,
1814 o_attach: osc_attach,
1815 o_detach: osc_detach,
1816 o_setup: client_obd_setup,
1817 o_cleanup: client_obd_cleanup,
1818 o_connect: client_import_connect,
1819 o_disconnect: client_import_disconnect,
1820 o_statfs: osc_statfs,
1821 o_packmd: osc_packmd,
1822 o_unpackmd: osc_unpackmd,
1823 o_create: osc_create,
1824 o_destroy: osc_destroy,
1825 o_getattr: osc_getattr,
1826 o_getattr_async: osc_getattr_async,
1827 o_setattr: osc_setattr,
1831 o_brw_async: osc_brw_async,
1833 o_enqueue: osc_enqueue,
1835 o_cancel: osc_cancel,
1836 o_cancel_unused: osc_cancel_unused,
1837 o_iocontrol: osc_iocontrol,
1838 o_get_info: osc_get_info,
1839 .o_mark_page_dirty = osc_mark_page_dirty,
1840 .o_clear_dirty_pages = osc_clear_dirty_pages,
1841 .o_last_dirty_offset = osc_last_dirty_offset,
1844 struct obd_ops sanosc_obd_ops = {
1845 o_owner: THIS_MODULE,
1846 o_attach: osc_attach,
1847 o_detach: osc_detach,
1848 o_cleanup: client_obd_cleanup,
1849 o_connect: client_import_connect,
1850 o_disconnect: client_import_disconnect,
1851 o_statfs: osc_statfs,
1852 o_packmd: osc_packmd,
1853 o_unpackmd: osc_unpackmd,
1854 o_create: osc_create,
1855 o_destroy: osc_destroy,
1856 o_getattr: osc_getattr,
1857 o_getattr_async: osc_getattr_async,
1858 o_setattr: osc_setattr,
1862 o_setup: client_sanobd_setup,
1866 o_enqueue: osc_enqueue,
1868 o_cancel: osc_cancel,
1869 o_cancel_unused: osc_cancel_unused,
1870 o_iocontrol: osc_iocontrol,
1871 .o_mark_page_dirty = osc_mark_page_dirty,
1872 .o_clear_dirty_pages = osc_clear_dirty_pages,
1873 .o_last_dirty_offset = osc_last_dirty_offset,
1876 int __init osc_init(void)
1878 struct lprocfs_static_vars lvars;
1882 LASSERT(sizeof(struct obd_client_handle) <= FD_OSTDATA_SIZE);
1883 LASSERT(sizeof(struct obd_client_handle) <= OBD_INLINESZ);
1885 lprocfs_init_vars(&lvars);
1887 rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1892 rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1893 LUSTRE_SANOSC_NAME);
1895 class_unregister_type(LUSTRE_OSC_NAME);
1900 static void __exit osc_exit(void)
1902 class_unregister_type(LUSTRE_SANOSC_NAME);
1903 class_unregister_type(LUSTRE_OSC_NAME);
1907 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1908 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1909 MODULE_LICENSE("GPL");
1911 module_init(osc_init);
1912 module_exit(osc_exit);