From c067da2e9f279550c94acdf194b91dff7ad56f19 Mon Sep 17 00:00:00 2001 From: rread Date: Mon, 13 Oct 2003 22:44:43 +0000 Subject: [PATCH] msg --- lustre/obdclass/llog_test.c | 5 +- lustre/ptlrpc/llogd.c | 336 +++++++++++++++++++++++++++++------------- lustre/ptlrpc/ptlrpc_module.c | 7 +- 3 files changed, 242 insertions(+), 106 deletions(-) diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 752459e..6f9e099 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -334,7 +334,8 @@ static int llog_test_5(struct obd_device *obd) } -static int llog_test6_process_rec(struct llog_rec_hdr * rec, void * private) +static int llog_test6_process_rec(struct llog_handle *handle, + struct llog_rec_hdr * rec, void * private) { return 0; } @@ -364,7 +365,7 @@ static int llog_test_6(struct obd_device *obd, char * name) } exp = class_conn2export(&exph); - rc = mdc_llog_process(exp, name, NULL, llog_test6_process_rec); + rc = mdc_llog_process(exp, name, llog_test6_process_rec, NULL); if (rc) { CERROR("mdc_llog_process failed: rc = %d\n", rc); } diff --git a/lustre/ptlrpc/llogd.c b/lustre/ptlrpc/llogd.c index e5525c9..58e3109 100644 --- a/lustre/ptlrpc/llogd.c +++ b/lustre/ptlrpc/llogd.c @@ -35,61 +35,60 @@ #include #include -int llogd_init(struct ptlrpc_request *req) +int llogd_create(struct ptlrpc_request *req) { struct obd_export *exp = req->rq_export; struct obd_device *obd = exp->exp_obd; struct llog_handle *loghandle; - struct llog_desc desc; + struct llogd_body *body; struct obd_run_ctxt saved; - void * ptr; - char * name; - int size[] = {sizeof (desc), - LLOG_BITMAP_BYTES}; + struct llog_logid *logid = NULL; + char * name = NULL; + int size = sizeof (*body); int rc, rc2; ENTRY; LASSERT(obd->obd_log_exp == NULL); - name = lustre_msg_string(req->rq_reqmsg, 0, 0); - if (name == NULL) { - CERROR("Can't unpack name\n"); - GOTO(out, rc = -EFAULT); - } + body = lustre_swab_reqbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); + GOTO(out, rc =-EFAULT); + } + + if (body->lgd_logid.lgl_oid > 0) + logid = &body->lgd_logid; + + if (req->rq_reqmsg->bufcount > 1) { + name = lustre_msg_string(req->rq_reqmsg, 1, 0); + if (name == NULL) { + CERROR("Can't unpack name\n"); + GOTO(out, rc = -EFAULT); + } + } - memset(&desc, 0, sizeof(desc)); - push_ctxt(&saved, &obd->obd_ctxt, NULL); obd->obd_log_exp = class_export_get(exp); - rc = llog_create(obd, &loghandle, NULL, name); + rc = llog_create(obd, &loghandle, logid, name); if (rc) GOTO(out_pop, rc); - desc.lgd_logid = loghandle->lgh_id; - desc.lgd_cur_offset = LLOG_CHUNK_SIZE; /* skip header block */ - - rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, - NULL); - if (rc) - GOTO(out_close, rc); - rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) GOTO(out_close, rc = -ENOMEM); - ptr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (desc)); - memcpy(ptr, &desc, sizeof(desc)); - - ptr = lustre_msg_buf(req->rq_repmsg, 1, LLOG_BITMAP_BYTES); - memcpy(ptr, loghandle->lgh_hdr->llh_bitmap, LLOG_BITMAP_BYTES); + body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body)); + body->lgd_logid = loghandle->lgh_id; out_close: rc2 = llog_close(loghandle); class_export_put(obd->obd_log_exp); - obd->obd_log_exp = NULL; if (!rc) rc = rc2; out_pop: + obd->obd_log_exp = NULL; pop_ctxt(&saved, &obd->obd_ctxt, NULL); out: RETURN(rc); @@ -100,21 +99,21 @@ int llogd_next_block(struct ptlrpc_request *req) struct obd_export *exp = req->rq_export; struct obd_device *obd = exp->exp_obd; struct llog_handle *loghandle; - struct llog_desc *desc; + struct llogd_body *body; struct obd_run_ctxt saved; __u8 *buf; void * ptr; - int size[] = {sizeof (*desc), + int size[] = {sizeof (*body), LLOG_CHUNK_SIZE}; int rc, rc2; ENTRY; LASSERT(obd->obd_log_exp == NULL); - desc = lustre_swab_reqbuf(req, 0, sizeof(*desc), - lustre_swab_llog_desc); - if (desc == NULL) { - CERROR ("Can't unpack llog_desc\n"); + body = lustre_swab_reqbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); GOTO(out, rc =-EFAULT); } @@ -125,7 +124,7 @@ int llogd_next_block(struct ptlrpc_request *req) push_ctxt(&saved, &obd->obd_ctxt, NULL); obd->obd_log_exp = class_export_get(exp); - rc = llog_create(obd, &loghandle, &desc->lgd_logid, NULL); + rc = llog_create(obd, &loghandle, &body->lgd_logid, NULL); if (rc) GOTO(out_pop, rc); @@ -134,9 +133,9 @@ int llogd_next_block(struct ptlrpc_request *req) GOTO(out_close, rc); memset(buf, 0, LLOG_CHUNK_SIZE); - rc = llog_next_block(loghandle, &desc->lgd_saved_index, - desc->lgd_index, - &desc->lgd_cur_offset, buf, LLOG_CHUNK_SIZE); + rc = llog_next_block(loghandle, &body->lgd_saved_index, + body->lgd_index, + &body->lgd_cur_offset, buf, LLOG_CHUNK_SIZE); if (rc) GOTO(out_close, rc); @@ -145,8 +144,8 @@ int llogd_next_block(struct ptlrpc_request *req) if (rc) GOTO(out_close, rc = -ENOMEM); - ptr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (desc)); - memcpy(ptr, desc, sizeof(*desc)); + ptr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (body)); + memcpy(ptr, body, sizeof(*body)); ptr = lustre_msg_buf(req->rq_repmsg, 1, LLOG_CHUNK_SIZE); memcpy(ptr, buf, LLOG_CHUNK_SIZE); @@ -165,6 +164,67 @@ out: RETURN(rc); } +int llogd_read_header(struct ptlrpc_request *req) +{ + struct obd_export *exp = req->rq_export; + struct obd_device *obd = exp->exp_obd; + struct llog_handle *loghandle; + struct llogd_body *body; + struct llog_log_hdr *hdr; + struct obd_run_ctxt saved; + __u8 *buf; + int size[] = {sizeof (*hdr)}; + int rc, rc2; + ENTRY; + + LASSERT(obd->obd_log_exp == NULL); + + body = lustre_swab_reqbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); + GOTO(out, rc =-EFAULT); + } + + OBD_ALLOC(buf, LLOG_CHUNK_SIZE); + if (!buf) + GOTO(out, rc = -ENOMEM); + + push_ctxt(&saved, &obd->obd_ctxt, NULL); + obd->obd_log_exp = class_export_get(exp); + + rc = llog_create(obd, &loghandle, &body->lgd_logid, NULL); + if (rc) + GOTO(out_pop, rc); + + /* init_handle reads the header */ + rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL); + if (rc) + GOTO(out_close, rc); + + + rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) + GOTO(out_close, rc = -ENOMEM); + + hdr = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*hdr)); + memcpy(hdr, loghandle->lgh_hdr, sizeof(*hdr)); + +out_close: + rc2 = llog_close(loghandle); + class_export_put(obd->obd_log_exp); + obd->obd_log_exp = NULL; + if (!rc) + rc = rc2; + +out_pop: + pop_ctxt(&saved, &obd->obd_ctxt, NULL); + OBD_FREE(buf, LLOG_CHUNK_SIZE); + +out: + RETURN(rc); +} + int llogd_close(struct ptlrpc_request *req) { int rc; @@ -175,105 +235,160 @@ int llogd_close(struct ptlrpc_request *req) } -int llogd_client_init(struct obd_export *exp, char * logname, - struct llog_desc **desc, __u32 **bitmap) +/* This is a callback from the llog_* functions. + * Assumes caller has already pushed us into the kernel context. */ +int llogd_client_create(struct obd_device *obd, struct llog_handle **res, + struct llog_logid *logid, char *name) { - struct obd_import *imp = class_exp2cliimp(exp); + struct client_obd *cli = &obd->u.cli; + struct obd_import *imp = cli->cl_import; + struct llogd_body req_body; + struct llogd_body *body; + struct llog_handle *handle; struct ptlrpc_request *req = NULL; - void * ptr; - int size; - int repsize[] = {sizeof (**desc), - LLOG_BITMAP_BYTES}; + int size[2] = {sizeof(req_body)}; + char *tmp[2] = {(char*) &req_body}; + int bufcount = 1; + int repsize[] = {sizeof (req_body)}; int rc; ENTRY; - LASSERT(*desc == NULL); - LASSERT(*bitmap == NULL); - - size = strlen(logname) + 1; - req = ptlrpc_prep_req(imp, LLOG_INIT, 1, &size, &logname); - if (!req) - GOTO(out, rc = -ENOMEM); - - req->rq_replen = lustre_msg_size(2, repsize); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); + handle = llog_alloc_handle(); + if (handle == NULL) + RETURN(-ENOMEM); + *res = handle; + + memset(&req_body, 0, sizeof(req_body)); + if (logid) + req_body.lgd_logid = *logid; - OBD_ALLOC(*desc, sizeof (**desc)); - if (*desc == NULL) - GOTO(out, rc = -ENOMEM); + if (name) { + size[bufcount] = strlen(name) + 1; + tmp[bufcount] = name; + bufcount++; + } - OBD_ALLOC(*bitmap, LLOG_BITMAP_BYTES); - if (*bitmap == NULL) + req = ptlrpc_prep_req(imp, LLOGD_CREATE, bufcount, size, tmp); + if (!req) GOTO(err_free, rc = -ENOMEM); - - ptr = lustre_swab_repbuf(req, 0, sizeof(**desc), - lustre_swab_llog_desc); - if (ptr == NULL) { - CERROR ("Can't unpack llog_desc\n"); - GOTO(err_free, rc =-EFAULT); - } - memcpy(*desc, ptr, sizeof(**desc)); + req->rq_replen = lustre_msg_size(1, repsize); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(err_free, rc); - ptr = lustre_msg_buf(req->rq_repmsg, 1, LLOG_BITMAP_BYTES); - if (ptr == NULL) { - CERROR ("Can't unpack bitmap\n"); + body = lustre_swab_repbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); GOTO(err_free, rc =-EFAULT); } - memcpy(*bitmap, ptr, LLOG_BITMAP_BYTES); + + handle->lgh_id = body->lgd_logid; + handle->lgh_obd = obd; out: if (req) ptlrpc_req_finished(req); - RETURN(rc); + RETURN(0); err_free: - if (*bitmap) - OBD_FREE(*bitmap, LLOG_BITMAP_BYTES); - if (*desc) - OBD_FREE(*desc, sizeof (**desc)); + llog_free_handle(handle); goto out; } -int llogd_client_next_block(struct obd_export *exp, struct llog_desc *desc, - char * buf, int buf_size) + +struct obd_import *llog_lgh2imp(struct llog_handle *lgh) { - struct obd_import *imp = class_exp2cliimp(exp); + struct client_obd *cli = &lgh->lgh_obd->u.cli; + return cli->cl_import; +} + +int llogd_client_next_block(struct llog_handle *loghandle, + int *cur_idx, int next_idx, + __u64 *cur_offset, void *buf, int len) +{ + struct obd_import *imp = llog_lgh2imp(loghandle); struct ptlrpc_request *req = NULL; + struct llogd_body *body; void * ptr; - int size = sizeof(*desc); - int repsize[] = {sizeof (*desc), - LLOG_CHUNK_SIZE}; + int size = sizeof(*body); + int repsize[2] = {sizeof (*body)}; int rc; ENTRY; - LASSERT (buf_size == LLOG_CHUNK_SIZE); - - req = ptlrpc_prep_req(imp, LLOG_NEXT_BLOCK, 1, &size, (char **) &desc); + req = ptlrpc_prep_req(imp, LLOGD_NEXT_BLOCK, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); + body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); + body->lgd_logid = loghandle->lgh_id; + body->lgd_cur_offset = *cur_offset; + body->lgd_index = next_idx; + body->lgd_saved_index = *cur_idx; + body->lgd_len = len; + repsize[1] = len; + req->rq_replen = lustre_msg_size(2, repsize); rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); - ptr = lustre_swab_repbuf(req, 0, sizeof(*desc), - lustre_swab_llog_desc); - if (ptr == NULL) { - CERROR ("Can't unpack llog_desc\n"); + body = lustre_swab_repbuf(req, 0, sizeof(*body), + lustre_swab_llogd_body); + if (body == NULL) { + CERROR ("Can't unpack llogd_body\n"); GOTO(out, rc =-EFAULT); } - memcpy(desc, ptr, sizeof(*desc)); - ptr = lustre_msg_buf(req->rq_repmsg, 1, LLOG_CHUNK_SIZE); + ptr = lustre_msg_buf(req->rq_repmsg, 1, len); if (ptr == NULL) { CERROR ("Can't unpack bitmap\n"); GOTO(out, rc =-EFAULT); } - memcpy(buf, ptr, buf_size); + + *cur_idx = body->lgd_saved_index; + *cur_offset = body->lgd_cur_offset; + + memcpy(buf, ptr, len); + +out: + if (req) + ptlrpc_req_finished(req); + RETURN(rc); +} + + +int llogd_client_read_header(struct llog_handle *handle) +{ + struct obd_import *imp = llog_lgh2imp(handle); + struct ptlrpc_request *req = NULL; + struct llogd_body *body; + struct llog_log_hdr *hdr; + int size = sizeof(*body); + int repsize = sizeof (*hdr); + int rc; + ENTRY; + + req = ptlrpc_prep_req(imp, LLOGD_READ_HEADER, 1, &size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); + body->lgd_logid = handle->lgh_id; + + req->rq_replen = lustre_msg_size(1, &repsize); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); + + hdr = lustre_swab_repbuf(req, 0, sizeof(*hdr), + lustre_swab_llog_hdr); + if (hdr == NULL) { + CERROR ("Can't unpack llog_hdr\n"); + GOTO(out, rc =-EFAULT); + } + memcpy(handle->lgh_hdr, hdr, sizeof (*hdr)); out: if (req) @@ -281,11 +396,28 @@ out: RETURN(rc); } +int llogd_client_close(struct llog_handle *handle) +{ + int rc = 0; + + RETURN(rc); +} -int llogd_client_close(struct obd_export *exp, struct llog_desc **desc, - __u32 **bitmap) +int llogd_client_write_rec(struct llog_handle *loghandle, + struct llog_rec_hdr *rec, + struct llog_cookie *logcookies, + int numcookies, + void *buf, + int idx) { - OBD_FREE(*desc, sizeof (**desc)); - OBD_FREE(*bitmap, LLOG_BITMAP_BYTES); - RETURN(0); + int rc = 0; + RETURN(rc); } + +struct llog_operations llogd_client_ops = { +// lob_write_rec: llogd_client_write_rec, + lop_next_block: llogd_client_next_block, + lop_read_header: llogd_client_read_header, + lop_create: llogd_client_create, + lop_close: llogd_client_close, +}; diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index d0a8da5..8486059 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -252,12 +252,15 @@ EXPORT_SYMBOL(ptlrpc_lprocfs_register_obd); EXPORT_SYMBOL(ptlrpc_lprocfs_unregister_obd); /* llogd.c */ -EXPORT_SYMBOL(llogd_init); +EXPORT_SYMBOL(llogd_create); EXPORT_SYMBOL(llogd_next_block); +EXPORT_SYMBOL(llogd_read_header); EXPORT_SYMBOL(llogd_close); -EXPORT_SYMBOL(llogd_client_init); +EXPORT_SYMBOL(llogd_client_create); EXPORT_SYMBOL(llogd_client_next_block); +EXPORT_SYMBOL(llogd_client_read_header); EXPORT_SYMBOL(llogd_client_close); +EXPORT_SYMBOL(llogd_client_ops); #ifdef __KERNEL__ MODULE_AUTHOR("Cluster File Systems, Inc. "); -- 1.8.3.1