* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <obd.h>
/* osc_build_res_name() */
#include <cl_object.h>
-#include <lclient.h>
#include "osc_internal.h"
/** \defgroup osc osc
* State maintained by osc layer for each IO context.
*/
struct osc_io {
- /** super class */
- struct cl_io_slice oi_cl;
- /** true if this io is lockless. */
- int oi_lockless;
+ /** super class */
+ struct cl_io_slice oi_cl;
+ /** true if this io is lockless. */
+ int oi_lockless:1,
+ /** true if this io is counted as active IO */
+ oi_is_active:1;
+ /** how many LRU pages are reserved for this IO */
+ unsigned long oi_lru_reserved;
+
/** active extents, we know how many bytes is going to be written,
* so having an active extent will prevent it from being fragmented */
struct osc_extent *oi_active;
/** partially truncated extent, we need to hold this extent to prevent
* page writeback from happening. */
struct osc_extent *oi_trunc;
-
- int oi_lru_reserved;
-
- struct obd_info oi_info;
+ /** write osc_lock for this IO, used by osc_extent_find(). */
+ struct osc_lock *oi_write_osclock;
struct obdo oi_oa;
struct osc_async_cbargs {
bool opc_rpc_sent;
};
/**
- * State of transfer for osc.
- */
-struct osc_req {
- struct cl_req_slice or_cl;
-};
-
-/**
* State maintained by osc layer for the duration of a system call.
*/
struct osc_session {
#define OTI_PVEC_SIZE 256
struct osc_thread_info {
- struct ldlm_res_id oti_resname;
- ldlm_policy_data_t oti_policy;
- struct cl_lock_descr oti_descr;
- struct cl_attr oti_attr;
- struct lustre_handle oti_handle;
- struct cl_page_list oti_plist;
+ struct ldlm_res_id oti_resname;
+ union ldlm_policy_data oti_policy;
+ struct cl_lock_descr oti_descr;
+ struct cl_attr oti_attr;
+ struct lustre_handle oti_handle;
+ struct cl_page_list oti_plist;
struct cl_io oti_io;
void *oti_pvec[OTI_PVEC_SIZE];
/**
*/
pgoff_t oti_next_index;
pgoff_t oti_fn_index; /* first non-overlapped index */
+ struct cl_sync_io oti_anchor;
+ struct cl_req_attr oti_req_attr;
};
struct osc_object {
/** Serialization object for osc_object::oo_debug_io. */
struct mutex oo_debug_mutex;
#endif
- /**
- * List of pages in transfer.
- */
- struct list_head oo_inflight[CRT_NR];
- /**
- * Lock, protecting ccc_object::cob_inflight, because a seat-belt is
- * locked during take-off and landing.
- */
- spinlock_t oo_seatbelt;
-
/**
* used by the osc to keep track of what objects to build into rpcs.
* Protected by client_obd->cli_loi_list_lock.
/**
* Radix tree for caching pages
*/
- struct radix_tree_root oo_tree;
spinlock_t oo_tree_lock;
+ struct radix_tree_root oo_tree;
unsigned long oo_npages;
+
+ /* Protect osc_lock this osc_object has */
+ struct list_head oo_ol_list;
+ spinlock_t oo_ol_spin;
+
+ /** number of active IOs of this object */
+ atomic_t oo_nr_ios;
+ wait_queue_head_t oo_io_waitq;
};
static inline void osc_object_lock(struct osc_object *obj)
OLS_ENQUEUED,
OLS_UPCALL_RECEIVED,
OLS_GRANTED,
- OLS_RELEASED,
- OLS_BLOCKED,
OLS_CANCELLED
};
*
* Interaction with DLM.
*
- * CLIO enqueues all DLM locks through ptlrpcd (that is, in "async" mode).
- *
* Once receive upcall is invoked, osc_lock remembers a handle of DLM lock in
- * osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_lock.
+ * osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_dlmlock.
*
* This pointer is protected through a reference, acquired by
* osc_lock_upcall0(). Also, an additional reference is acquired by
* future.
*/
struct osc_lock {
- struct cl_lock_slice ols_cl;
- /** underlying DLM lock */
- struct ldlm_lock *ols_lock;
- /** lock value block */
- struct ost_lvb ols_lvb;
- /** DLM flags with which osc_lock::ols_lock was enqueued */
- __u64 ols_flags;
- /** osc_lock::ols_lock handle */
- struct lustre_handle ols_handle;
- struct ldlm_enqueue_info ols_einfo;
- enum osc_lock_state ols_state;
+ struct cl_lock_slice ols_cl;
+ /** Internal lock to protect states, etc. */
+ spinlock_t ols_lock;
+ /** Owner sleeps on this channel for state change */
+ struct cl_sync_io *ols_owner;
+ /** waiting list for this lock to be cancelled */
+ struct list_head ols_waiting_list;
+ /** wait entry of ols_waiting_list */
+ struct list_head ols_wait_entry;
+ /** list entry for osc_object::oo_ol_list */
+ struct list_head ols_nextlock_oscobj;
+
+ /** underlying DLM lock */
+ struct ldlm_lock *ols_dlmlock;
+ /** DLM flags with which osc_lock::ols_lock was enqueued */
+ __u64 ols_flags;
+ /** osc_lock::ols_lock handle */
+ struct lustre_handle ols_handle;
+ struct ldlm_enqueue_info ols_einfo;
+ enum osc_lock_state ols_state;
+ /** lock value block */
+ struct ost_lvb ols_lvb;
/**
* true, if ldlm_lock_addref() was called against
*/
ols_locklessable:1,
/**
- * set by osc_lock_use() to wait until blocking AST enters into
- * osc_ldlm_blocking_ast0(), so that cl_lock mutex can be used for
- * further synchronization.
- */
- ols_ast_wait:1,
- /**
- * If the data of this lock has been flushed to server side.
- */
- ols_flush:1,
- /**
* if set, the osc_lock is a glimpse lock. For glimpse locks, we treat
* the EVAVAIL error as torerable, this will make upper logic happy
* to wait all glimpse locks to each OSTs to be completed.
* For async glimpse lock.
*/
ols_agl:1;
- /**
- * IO that owns this lock. This field is used for a dead-lock
- * avoidance by osc_lock_enqueue_wait().
- *
- * XXX: unfortunately, the owner of a osc_lock is not unique,
- * the lock may have multiple users, if the lock is granted and
- * then matched.
- */
- struct osc_io *ols_owner;
};
*/
struct list_head ops_lru;
/**
- * Linkage into a per-osc_object list of pages in flight. For
- * debugging.
- */
- struct list_head ops_inflight;
- /**
- * Thread that submitted this page for transfer. For debugging.
- */
- struct task_struct *ops_submitter;
- /**
* Submit time - the time when the page is starting RPC. For debugging.
*/
cfs_time_t ops_submit_time;
extern struct kmem_cache *osc_object_kmem;
extern struct kmem_cache *osc_thread_kmem;
extern struct kmem_cache *osc_session_kmem;
-extern struct kmem_cache *osc_req_kmem;
extern struct kmem_cache *osc_extent_kmem;
extern struct lu_device_type osc_device_type;
const struct cl_io *io);
int osc_io_init (const struct lu_env *env,
struct cl_object *obj, struct cl_io *io);
-int osc_req_init (const struct lu_env *env, struct cl_device *dev,
- struct cl_req *req);
struct lu_object *osc_object_alloc(const struct lu_env *env,
const struct lu_object_header *hdr,
struct lu_device *dev);
int osc_page_init(const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, pgoff_t ind);
-void osc_index2policy (ldlm_policy_data_t *policy, const struct cl_object *obj,
- pgoff_t start, pgoff_t end);
-int osc_lvb_print (const struct lu_env *env, void *cookie,
- lu_printer_t p, const struct ost_lvb *lvb);
+void osc_index2policy(union ldlm_policy_data *policy,
+ const struct cl_object *obj, pgoff_t start, pgoff_t end);
+int osc_lvb_print(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct ost_lvb *lvb);
void osc_lru_add_batch(struct client_obd *cli, struct list_head *list);
void osc_page_submit(const struct lu_env *env, struct osc_page *opg,
enum cl_req_type crt, int brw_flags);
int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops);
int osc_set_async_flags(struct osc_object *obj, struct osc_page *opg,
- obd_flag async_flags);
+ u32 async_flags);
int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
struct page *page, loff_t offset);
int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops);
int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
struct list_head *list, int cmd, int brw_flags);
-int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
- struct osc_object *obj, __u64 size);
-void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
- struct osc_object *obj);
+int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
+ __u64 size, struct osc_extent **extp);
+void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext);
int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
pgoff_t start, pgoff_t end, int hp, int discard);
int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
pgoff_t start, pgoff_t end);
void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
- struct osc_object *osc, pdl_policy_t pol);
+ struct osc_object *osc);
int lru_queue_work(const struct lu_env *env, void *data);
void osc_object_set_contended (struct osc_object *obj);
return (struct cl_object *)&obj->oo_cl;
}
-static inline ldlm_mode_t osc_cl_lock2ldlm(enum cl_lock_mode mode)
+static inline enum ldlm_mode osc_cl_lock2ldlm(enum cl_lock_mode mode)
{
- LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
- if (mode == CLM_READ)
- return LCK_PR;
- else if (mode == CLM_WRITE)
- return LCK_PW;
- else
- return LCK_GROUP;
+ LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
+ if (mode == CLM_READ)
+ return LCK_PR;
+ if (mode == CLM_WRITE)
+ return LCK_PW;
+ return LCK_GROUP;
}
-static inline enum cl_lock_mode osc_ldlm2cl_lock(ldlm_mode_t mode)
+static inline enum cl_lock_mode osc_ldlm2cl_lock(enum ldlm_mode mode)
{
- LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
- if (mode == LCK_PR)
- return CLM_READ;
- else if (mode == LCK_PW)
- return CLM_WRITE;
- else
- return CLM_GROUP;
+ LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
+ if (mode == LCK_PR)
+ return CLM_READ;
+ if (mode == LCK_PW)
+ return CLM_WRITE;
+ return CLM_GROUP;
}
static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
}
+static inline struct osc_page *
+osc_cl_page_osc(struct cl_page *page, struct osc_object *osc)
+{
+ const struct cl_page_slice *slice;
+
+ LASSERT(osc != NULL);
+ slice = cl_object_page_slice(&osc->oo_cl, page);
+ return cl2osc_page(slice);
+}
+
static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice)
{
LINVRNT(osc_is_object(&slice->cls_obj->co_lu));
/** link list of osc_object's oo_{hp|urgent|locking}_exts. */
struct list_head oe_link;
/** state of this extent */
- unsigned int oe_state;
+ enum osc_extent_state oe_state;
/** flags for this extent. */
unsigned int oe_intree:1,
/** 0 is write, 1 is read */
oe_rw:1,
+ /** sync extent, queued by osc_queue_sync_pages() */
+ oe_sync:1,
+ /** set if this extent has partial, sync pages.
+ * Extents with partial page(s) can't merge with others in RPC */
+ oe_no_merge:1,
oe_srvlock:1,
oe_memalloc:1,
/** an ACTIVE extent is going to be truncated, so when this extent
* state has changed. */
wait_queue_head_t oe_waitq;
/** lock covering this extent */
- struct cl_lock *oe_osclock;
+ struct ldlm_lock *oe_dlmlock;
/** terminator of this extent. Must be true if this extent is in IO. */
struct task_struct *oe_owner;
/** return value of writeback. If somebody is waiting for this extent,
int sent, int rc);
int osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
-int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *lock);
+int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
+ pgoff_t start, pgoff_t end, enum cl_lock_mode mode);
typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
struct osc_page *, void *);
int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
struct osc_object *osc, pgoff_t start, pgoff_t end,
osc_page_gang_cbt cb, void *cbdata);
-
/** @} osc */
#endif /* OSC_CL_INTERNAL_H */