* Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <lprocfs_status.h>
#include <libcfs/list.h>
#include "ost_internal.h"
+#include <lustre_fid.h>
static int oss_num_threads;
CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
*/
static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
__u64 start, __u64 count, struct lustre_handle *lh,
- int mode, int flags)
+ int mode, __u64 flags)
{
struct ldlm_res_id res_id;
ldlm_policy_data_t policy;
RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
LDLM_EXTENT, &policy, mode, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, NULL, lh));
+ ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
+ NULL, lh));
}
/* Helper function: release lock, if any. */
struct obd_trans_info *oti)
{
struct ost_body *body, *repbody;
- int rc, flags = 0;
+ __u64 flags = 0;
struct lustre_handle lh = {0,};
+ int rc;
ENTRY;
/* check that we do support OBD_CONNECT_TRUNCLOCK. */
struct obd_ioobj *obj, struct niobuf_remote *nb,
struct lustre_handle *lh)
{
- int flags = 0;
+ __u64 flags = 0;
int nrbufs = obj->ioo_bufcnt;
struct ldlm_res_id res_id;
ldlm_policy_data_t policy;
RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
LDLM_EXTENT, &policy, mode, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, NULL, lh));
+ ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
+ NULL, lh));
}
static void ost_brw_lock_put(int mode,
/* In normal mode of operation an I/O request is serviced only
* by ll_ost_io threads each of them has own tls buffers allocated by
- * ost_thread_init().
+ * ost_io_thread_init().
* During recovery, an I/O request may be queued until any of the ost
* service threads process it. Not necessary it should be one of
* ll_ost_io threads. In that case we dynamically allocating tls
lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
l_wait_event(waitq, 0, &lwi1);
rc = target_bulk_io(exp, desc, &lwi);
- ptlrpc_free_bulk(desc);
+ ptlrpc_free_bulk_nopin(desc);
}
RETURN(rc);
}
+static void ost_warn_on_cksum(struct ptlrpc_request *req,
+ struct ptlrpc_bulk_desc *desc,
+ struct niobuf_local *local_nb, int npages,
+ obd_count client_cksum, obd_count server_cksum,
+ int mmap)
+{
+ struct obd_export *exp = req->rq_export;
+ struct ost_body *body;
+ char *router;
+ char *via;
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ LASSERT (body != NULL);
+
+ if (req->rq_peer.nid == desc->bd_sender) {
+ via = router = "";
+ } else {
+ via = " via ";
+ router = libcfs_nid2str(desc->bd_sender);
+ }
+
+ if (mmap) {
+ CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
+ client_cksum, server_cksum);
+ return;
+ }
+
+ LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
+ DFID" object "LPU64"/"LPU64" extent ["LPU64"-"LPU64
+ "]: client csum %x, server csum %x\n",
+ exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
+ via, router,
+ body->oa.o_valid & OBD_MD_FLFID ?
+ body->oa.o_parent_seq : (__u64)0,
+ body->oa.o_valid & OBD_MD_FLFID ?
+ body->oa.o_parent_oid : 0,
+ body->oa.o_valid & OBD_MD_FLFID ?
+ body->oa.o_parent_ver : 0,
+ body->oa.o_id,
+ body->oa.o_valid & OBD_MD_FLGROUP ?
+ body->oa.o_seq : (__u64)0,
+ local_nb[0].lnb_file_offset,
+ local_nb[npages-1].lnb_file_offset +
+ local_nb[npages-1].len - 1,
+ client_cksum, server_cksum);
+}
+
static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct ptlrpc_bulk_desc *desc = NULL;
repbody->oa.o_cksum = server_cksum;
cksum_counter++;
if (unlikely(client_cksum != server_cksum)) {
- CDEBUG_LIMIT(mmap ? D_INFO : D_ERROR,
- "client csum %x, server csum %x\n",
- client_cksum, server_cksum);
+ ost_warn_on_cksum(req, desc, local_nb, npages,
+ client_cksum, server_cksum, mmap);
cksum_counter = 0;
+
} else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
cksum_counter, libcfs_id2str(req->rq_peer),
*/
repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
- if (unlikely(client_cksum != server_cksum && rc == 0 && !mmap)) {
- int new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
- char *msg;
- char *via;
- char *router;
-
- if (new_cksum == server_cksum)
- msg = "changed in transit before arrival at OST";
- else if (new_cksum == client_cksum)
- msg = "initial checksum before message complete";
- else
- msg = "changed in transit AND after initial checksum";
-
- if (req->rq_peer.nid == desc->bd_sender) {
- via = router = "";
- } else {
- via = " via ";
- router = libcfs_nid2str(desc->bd_sender);
- }
-
- LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
- "%s%s%s inode "DFID" object "
- LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
- exp->exp_obd->obd_name, msg,
- libcfs_id2str(req->rq_peer),
- via, router,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_seq : (__u64)0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_oid : 0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_ver : 0,
- body->oa.o_id,
- body->oa.o_valid & OBD_MD_FLGROUP ?
- body->oa.o_seq : (__u64)0,
- local_nb[0].lnb_file_offset,
- local_nb[npages-1].lnb_file_offset +
- local_nb[npages-1].len - 1 );
- CERROR("client csum %x, original server csum %x, "
- "server csum now %x\n",
- client_cksum, server_cksum, new_cksum);
- }
-
if (rc == 0) {
int nob = 0;
RETURN(rc);
}
-#define ost_init_sec_none(reply, exp) \
-do { \
- reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT | \
- OBD_CONNECT_RMT_CLIENT_FORCE | \
- OBD_CONNECT_OSS_CAPA); \
- cfs_spin_lock(&exp->exp_lock); \
- exp->exp_connect_flags = reply->ocd_connect_flags; \
- cfs_spin_unlock(&exp->exp_lock); \
+#define ost_init_sec_none(reply, exp) \
+do { \
+ reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT | \
+ OBD_CONNECT_RMT_CLIENT_FORCE | \
+ OBD_CONNECT_OSS_CAPA); \
+ spin_lock(&exp->exp_lock); \
+ exp->exp_connect_flags = reply->ocd_connect_flags; \
+ spin_unlock(&exp->exp_lock); \
} while (0)
static int ost_init_sec_level(struct ptlrpc_request *req)
if (!filter->fo_fl_oss_capa)
reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
- cfs_spin_lock(&exp->exp_lock);
- exp->exp_connect_flags = reply->ocd_connect_flags;
- cfs_spin_unlock(&exp->exp_lock);
+ spin_lock(&exp->exp_lock);
+ exp->exp_connect_flags = reply->ocd_connect_flags;
+ spin_unlock(&exp->exp_lock);
}
break;
default:
}
if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
- cfs_read_lock(&filter->fo_sptlrpc_lock);
- sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
- req->rq_sp_from,
- req->rq_peer.nid,
- &flvr);
- cfs_read_unlock(&filter->fo_sptlrpc_lock);
+ read_lock(&filter->fo_sptlrpc_lock);
+ sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
+ req->rq_sp_from,
+ req->rq_peer.nid,
+ &flvr);
+ read_unlock(&filter->fo_sptlrpc_lock);
- cfs_spin_lock(&exp->exp_lock);
+ spin_lock(&exp->exp_lock);
exp->exp_sp_peer = req->rq_sp_from;
exp->exp_flvr = flvr;
rc = -EACCES;
}
- cfs_spin_unlock(&exp->exp_lock);
+ spin_unlock(&exp->exp_lock);
} else {
if (exp->exp_sp_peer != req->rq_sp_from) {
CERROR("RPC source %s doesn't match %s\n",
lustre_msg_get_version(msg),
LUSTRE_OBD_VERSION);
break;
+ case SEQ_QUERY:
+ /* Note: client always use MDS_VERSION for FID request */
+ rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ lustre_msg_get_opc(msg),
+ lustre_msg_get_version(msg),
+ LUSTRE_MDS_VERSION);
+ break;
case OST_CREATE:
case OST_DESTROY:
case OST_GETATTR:
}
- cfs_spin_lock_bh(&exp->exp_bl_list_lock);
+ spin_lock_bh(&exp->exp_bl_list_lock);
cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
ost_prolong_lock_one(data, lock);
}
- cfs_spin_unlock_bh(&exp->exp_bl_list_lock);
+ spin_unlock_bh(&exp->exp_bl_list_lock);
- EXIT;
+ EXIT;
}
/**
};
/** Assign high priority operations to the request if needed. */
-static int ost_hpreq_handler(struct ptlrpc_request *req)
+static int ost_io_hpreq_handler(struct ptlrpc_request *req)
{
ENTRY;
if (req->rq_export) {
RETURN(0);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
GOTO(out, rc = -ENOSPC);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
+ if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
GOTO(out, rc = -EROFS);
rc = ost_brw_write(req, oti);
LASSERT(current->journal_info == NULL);
req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
rc = ost_get_info(req->rq_export, req);
break;
+ case SEQ_QUERY:
+ CDEBUG(D_INODE, "seq\n");
+ rc = seq_handle(req);
+ break;
case OST_QUOTACHECK:
CDEBUG(D_INODE, "quotacheck\n");
req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
if (rc)
RETURN(rc);
RETURN(ptlrpc_reply(req));
- case LDLM_ENQUEUE:
- CDEBUG(D_INODE, "enqueue\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
- RETURN(0);
- rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
- ost_blocking_ast,
- ldlm_server_glimpse_ast);
- fail = OBD_FAIL_OST_LDLM_REPLY_NET;
- break;
- case LDLM_CONVERT:
- CDEBUG(D_INODE, "convert\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
- RETURN(0);
- rc = ldlm_handle_convert(req);
- break;
- case LDLM_CANCEL:
- CDEBUG(D_INODE, "cancel\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
- RETURN(0);
- rc = ldlm_handle_cancel(req);
- break;
+ case LDLM_ENQUEUE:
+ CDEBUG(D_INODE, "enqueue\n");
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_NET))
+ RETURN(0);
+ rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
+ ost_blocking_ast,
+ ldlm_server_glimpse_ast);
+ fail = OBD_FAIL_OST_LDLM_REPLY_NET;
+ break;
+ case LDLM_CONVERT:
+ CDEBUG(D_INODE, "convert\n");
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT_NET))
+ RETURN(0);
+ rc = ldlm_handle_convert(req);
+ break;
+ case LDLM_CANCEL:
+ CDEBUG(D_INODE, "cancel\n");
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET))
+ RETURN(0);
+ rc = ldlm_handle_cancel(req);
+ break;
case LDLM_BL_CALLBACK:
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "callback\n");
return 0;
}
EXPORT_SYMBOL(ost_handle);
+
/*
- * free per-thread pool created by ost_thread_init().
+ * free per-thread pool created by ost_io_thread_init().
*/
-static void ost_thread_done(struct ptlrpc_thread *thread)
+static void ost_io_thread_done(struct ptlrpc_thread *thread)
{
struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
* Storage */
/*
* be prepared to handle partially-initialized pools (because this is
- * called from ost_thread_init() for cleanup.
+ * called from ost_io_thread_init() for cleanup.
*/
tls = thread->t_data;
if (tls != NULL) {
/*
* initialize per-thread page pool (bug 5137).
*/
-static int ost_thread_init(struct ptlrpc_thread *thread)
+static int ost_io_thread_init(struct ptlrpc_thread *thread)
{
struct ost_thread_local_cache *tls;
lprocfs_ost_init_vars(&lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
- cfs_mutex_init(&ost->ost_health_mutex);
+ mutex_init(&ost->ost_health_mutex);
svc_conf = (typeof(svc_conf)) {
.psc_name = LUSTRE_OSS_NAME,
.psc_ops = {
.so_req_handler = ost_handle,
.so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
},
};
ost->ost_create_service = ptlrpc_register_service(&svc_conf,
oss_io_cpts : NULL,
},
.psc_ops = {
- .so_thr_init = ost_thread_init,
- .so_thr_done = ost_thread_done,
+ .so_thr_init = ost_io_thread_init,
+ .so_thr_done = ost_io_thread_done,
.so_req_handler = ost_handle,
- .so_hpreq_handler = ost_hpreq_handler,
+ .so_hpreq_handler = ost_io_hpreq_handler,
.so_req_printer = target_print_req,
- .so_hpreq_handler = NULL,
},
};
ost->ost_io_service = ptlrpc_register_service(&svc_conf,
GOTO(out_create, rc);
}
+ memset(&svc_conf, 0, sizeof(svc_conf));
+ svc_conf = (typeof(svc_conf)) {
+ .psc_name = "ost_seq",
+ .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
+ .psc_buf = {
+ .bc_nbufs = OST_NBUFS,
+ .bc_buf_size = OST_BUFSIZE,
+ .bc_req_max_size = OST_MAXREQSIZE,
+ .bc_rep_max_size = OST_MAXREPSIZE,
+ .bc_req_portal = SEQ_DATA_PORTAL,
+ .bc_rep_portal = OSC_REPLY_PORTAL,
+ },
+ .psc_thr = {
+ .tc_thr_name = "ll_ost_seq",
+ .tc_thr_factor = OSS_CR_THR_FACTOR,
+ .tc_nthrs_init = OSS_CR_NTHRS_INIT,
+ .tc_nthrs_base = OSS_CR_NTHRS_BASE,
+ .tc_nthrs_max = OSS_CR_NTHRS_MAX,
+ .tc_nthrs_user = oss_num_create_threads,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_DT_THREAD,
+ },
+
+ .psc_cpt = {
+ .cc_pattern = oss_cpts,
+ },
+ .psc_ops = {
+ .so_req_handler = ost_handle,
+ .so_req_printer = target_print_req,
+ .so_hpreq_handler = NULL,
+ },
+ };
+ ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
+ obd->obd_proc_entry);
+ if (IS_ERR(ost->ost_seq_service)) {
+ rc = PTR_ERR(ost->ost_seq_service);
+ CERROR("failed to start OST seq service: %d\n", rc);
+ ost->ost_seq_service = NULL;
+ GOTO(out_io, rc);
+ }
+
ping_evictor_start();
RETURN(0);
-
+out_io:
+ ptlrpc_unregister_service(ost->ost_io_service);
+ ost->ost_io_service = NULL;
out_create:
ptlrpc_unregister_service(ost->ost_create_service);
ost->ost_create_service = NULL;
static int ost_cleanup(struct obd_device *obd)
{
- struct ost_obd *ost = &obd->u.ost;
- int err = 0;
- ENTRY;
-
- ping_evictor_stop();
+ struct ost_obd *ost = &obd->u.ost;
+ int err = 0;
+ ENTRY;
- /* there is no recovery for OST OBD, all recovery is controlled by
- * obdfilter OBD */
- LASSERT(obd->obd_recovering == 0);
- cfs_mutex_lock(&ost->ost_health_mutex);
- ptlrpc_unregister_service(ost->ost_service);
- ptlrpc_unregister_service(ost->ost_create_service);
- ptlrpc_unregister_service(ost->ost_io_service);
- ost->ost_service = NULL;
- ost->ost_create_service = NULL;
+ ping_evictor_stop();
+
+ /* there is no recovery for OST OBD, all recovery is controlled by
+ * obdfilter OBD */
+ LASSERT(obd->obd_recovering == 0);
+ mutex_lock(&ost->ost_health_mutex);
+ ptlrpc_unregister_service(ost->ost_service);
+ ptlrpc_unregister_service(ost->ost_create_service);
+ ptlrpc_unregister_service(ost->ost_io_service);
+ ptlrpc_unregister_service(ost->ost_seq_service);
+ ost->ost_service = NULL;
+ ost->ost_create_service = NULL;
ost->ost_io_service = NULL;
+ ost->ost_seq_service = NULL;
- cfs_mutex_unlock(&ost->ost_health_mutex);
+ mutex_unlock(&ost->ost_health_mutex);
lprocfs_obd_cleanup(obd);
struct ost_obd *ost = &obd->u.ost;
int rc = 0;
- cfs_mutex_lock(&ost->ost_health_mutex);
+ mutex_lock(&ost->ost_health_mutex);
rc |= ptlrpc_service_health_check(ost->ost_service);
rc |= ptlrpc_service_health_check(ost->ost_create_service);
rc |= ptlrpc_service_health_check(ost->ost_io_service);
- cfs_mutex_unlock(&ost->ost_health_mutex);
+ mutex_unlock(&ost->ost_health_mutex);
/*
* health_check to return 0 on healthy