成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

怎么用C語言與java實(shí)現(xiàn)kafkaavro生產(chǎn)者和消費(fèi)者

本篇內(nèi)容介紹了“怎么用C語言與java實(shí)現(xiàn)kafka avro生產(chǎn)者和消費(fèi)者”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

成都創(chuàng)新互聯(lián)公司長期為上1000家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為錫林浩特企業(yè)提供專業(yè)的成都網(wǎng)站建設(shè)、網(wǎng)站制作,錫林浩特網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

原始數(shù)據(jù)格式

請(qǐng)求IP 應(yīng)答IP 域名 類型

3183375114 3729673322 "mx.hc.spdrb.com" A

以上數(shù)據(jù)是test文件的內(nèi)容

schema定義如下

{

    "type":"record",

    "name":"data",

    "fields":

    [

         {"name":"qip","type":"long"},

        {"name":"aip","type":"long"},

        {"name":"domain","type":"string"},

         {"name":"type","type":"string"}

     ]

}

C語言生產(chǎn)者代碼如下

#include <stdio.h> 
#include <unistd.h>
#include <stdlib.h> 
#include <string.h> 
#include "avro.h" 
#include "producer.h"

const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";

const char *file = "avro_file.dat";

const char *brokers = "xxxxx:9092"; const char *topic = "topic1";

void print_avro_value(avro_value_t *value) { char *json; if (!avro_value_to_json(value, 1, &json)) { printf("%s\n", json); free(json); } }

if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA),
            &test_schema, &error)) {
    fprintf(stderr, "schema error\n");
    exit(EXIT_FAILURE);
}

return test_schema;
avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error;


}

void add_data(avro_writer_t writer, avro_schema_t schema, int64_t qip, uint64_t aip, const char* domain, const char* type) 
{ 
     avro_datum_t data = avro_record(schema); 
     avro_datum_t dqip = avro_int64(qip); 
     avro_datum_t daip = avro_int64(aip); 
     avro_datum_t ddomain = avro_string(domain); 
     avro_datum_t dtype = avro_string(type);
     avro_record_set(data, "qip", dqip);
     avro_record_set(data, "aip", daip);
     avro_record_set(data, "domain", ddomain);
     avro_record_set(data, "type", dtype);

     avro_write_data(writer, NULL, f2c);

     avro_datum_decref(dqip);
     avro_datum_decref(daip);
     avro_datum_decref(ddomain);
     avro_datum_decref(dtype);
     avro_datum_decref(data);
}
int main(int argc, char* argv[]) 
{ 
    int len = 0; 
    avro_schema_t schema; 
    avro_writer_t mem_writer; 
    char buf[1024]; 
    char tmp[4][500]={{0x00}};
    FILE *fp = fopen("test","r");
    if(!fp)
    {
        printf("open test file error!\n");
        return -1;
    }
    schema = init_schema();
    mem_writer = avro_writer_memory(buf, 1024);

    while(fgets(buf, 1024,fp)!=NULL)
    {
        if(buf[strlen(buf)] == '\n') buf[strlen(buf)] = '\0';
        if(sscanf(buf, "%s%s%s%s", tmp[0],tmp[1],tmp[2],tmp[3])!=4) continue;
        add_data(mem_writer,schema,atol(tmp[0]),atol(tmp[1]),tmp[2],tmp[3]);
        printf("data len = %ld\n", avro_writer_tell(mem_writer));
        len = avro_writer_tell(mem_writer);
        kafka_putdata(buf, len,brokers,topic);//librdkafka實(shí)現(xiàn)的生產(chǎn)者代碼 未列出
        memset(tmp, 0x00, sizeof(tmp));
        memset(buf, 0x00, sizeof(buf));
        avro_writer_reset(mem_writer);
    }

    fclose(fp);
    avro_writer_free(mem_writer);
    return 0;
}

C語言實(shí)現(xiàn)的消費(fèi)者如下

#include "consumer.h"
 #include "avro.h"
 #include <time.h> 
#include <unistd.h>


const char *brokers = "xxxx:9092"; const char *topic = "topic1"; const char *group = "avrotest"; const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";

avro_schema_t init_schema() 
{
    avro_schema_t test_schema; 
    avro_schema_error_t error;


    if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA),
            &test_schema, &error)) {
        fprintf(stderr, "schema error\n");
        exit(EXIT_FAILURE);
    }

  return test_schema;
}
void print_data(avro_reader_t reader, avro_schema_t schema) { avro_datum_t data; if(avro_read_data(reader, schema, schema, &data) == 0) 
{ 
    int64_t qip; 
    int64_t aip; 
    char *domain; 
    char *type; 
    avro_datum_t q_datum,a_datum,d_datum,t_datum;
    avro_record_get(data, "qip", &q_datum);
    avro_int64_get(q_datum, &qip);
    avro_record_get(data, "aip", &a_datum);
    avro_int64_get(a_datum, &aip);

    avro_record_get(data, "domain", &d_datum);
    avro_string_get(d_datum, &domain);

    avro_record_get(data, "type", &t_datum);
    avro_string_get(t_datum, &type);
    printf("qip: %lld, aip: %lld,domain: %s,type:%s\n", qip,aip,domain,type);

    avro_datum_decref(data);
}

int main(int argc, char* argv[]) 
{
    rd_kafka_t *rk; 
    rd_kafka_topic_partition_list_t *topics; 
    if(initKafka(&rk, brokers, group, topic, &topics)<0){return -1;} 
    char buf[1024] = {0x00}; 
    int len = 0; 
    avro_schema_t schema; 
    avro_reader_t mem_reader; 
    schema = init_schema(); 
    mem_reader = avro_reader_memory(buf, 1024);
    while(1) 
    { 
        get_consumer_msg(rk, buf, &len); //librdkafka實(shí)現(xiàn)的消費(fèi)者 代碼未列出 
        if(len == 0) continue; 
        printf("len=%d\n",len); 
        print_data(mem_reader,schema); 
        avro_reader_reset(mem_reader); 
        memset(buf, 0x00, sizeof(buf)); 
    } 
    return 0;
}

C編譯的Makefile如下 兩個(gè)C程序通用

TARGET=avro-test 
INCLUDE=./avrolib/include/ 
SLIB=./avrolib/lib/libavro.a 
DLIB=-lz -llzma -lrdkafka 
INC = -I. -I./avrolib/include SOURCES =$(wildcard *.c) 
OBJECTS =$(SOURCES:.c=.o) 
RM=rm -rf 
CC=gcc -g 
CFLAGS= -Wall $(INC) 
all:$(TARGET) 
$(TARGET): $(OBJECTS) 
    $(CC) -o $@ $? $(SLIB) $(DLIB) $(CFLAGS) 
:$(SOURCES) 
    $(CC) -c 
clean: 
    $(RM) $(TARGET) $(OBJECTS) *~

java消費(fèi)者 gradle配置

dependencies { 
    testCompile group: 'junit', name: 'junit', version: '4.12' 
    compile group: 'org.apache.avro', name: 'avro', version: '1.9.1' 
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' 
}

avro解析 借鑒別人 言作者未知 請(qǐng)作者見諒

package zc;


import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader;
import java.io.IOException;

public class MyRecordDecoder { 
    public static GenericRecord genericRecord; 
    datumReader; static MyRecordDecoder myRecordDecoder = new MyRecordDecoder(); 
    final String USER_SCHEMA = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";
     public MyRecordDecoder() { 
            Schema schema = null; schema = new Schema.Parser().parse(USER_SCHEMA); 
            datumReader = new SpecificDatumReader<GenericRecord>(schema);
    }

    public GenericRecord getGenericRecord(BinaryDecoder decoder, byte[] value) throws IOException{
            return datumReader.read(null, decoder);
    }

    public static MyRecordDecoder getInstance() {
        if (myRecordDecoder==null)
            myRecordDecoder = new MyRecordDecoder();
        return myRecordDecoder;
    }
}

java消費(fèi)者 package zc;

import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.BinaryDecoder; 
import org.apache.avro.io.DecoderFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections; import java.util.Properties;

public class KafkaMessageAvro{ public static void main(String[] args) throws Exception { 
    String inTopic = args[0]; 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "xxxxx:9092"); 
    props.setProperty("group.id", "flink-topn-group"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Collections.singletonList(inTopic));
   
 try {
        while (true) {
            ConsumerRecords<String, byte[]> records = consumer.poll(1000);
            for (ConsumerRecord<String, byte[]> record : records) {
                byte[] ss = record.value();
                if (ss==null) {
                    continue;
                }
                System.out.println(ss.toString());
                GenericRecord genericRecord = null;
                BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(ss, null);
                while (!decoder.isEnd()) {
                    genericRecord = MyRecordDecoder.getInstance().getGenericRecord(decoder, ss);
                   System.out.println(genericRecord.get("qip").toString()+" "+genericRecord.get("aip").toString()+" "+genericRecord.get("domain").toString()+" "+genericRecord.get("type").toString());
                }

            }
        }
    } finally {
        consumer.close();
    }
}

“怎么用C語言與java實(shí)現(xiàn)kafka avro生產(chǎn)者和消費(fèi)者”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

新聞標(biāo)題:怎么用C語言與java實(shí)現(xiàn)kafkaavro生產(chǎn)者和消費(fèi)者
網(wǎng)站網(wǎng)址:http://jinyejixie.com/article48/ggesep.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供商城網(wǎng)站網(wǎng)站設(shè)計(jì)公司、移動(dòng)網(wǎng)站建設(shè)、網(wǎng)站內(nèi)鏈、外貿(mào)建站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化
台中县| 津市市| 湾仔区| 容城县| 肇东市| 泰安市| 凤山市| 固镇县| 佳木斯市| 龙泉市| 丹江口市| 太保市| 岢岚县| 平和县| 阿瓦提县| 商城县| 三原县| 台北县| 阜康市| 普兰县| 巫山县| 樟树市| 建阳市| 彭泽县| 南开区| 南开区| 武汉市| 辉南县| 习水县| 巴里| 通渭县| 永安市| 永平县| 道真| 驻马店市| 阜南县| 来宾市| 大宁县| 汤阴县| 西宁市| 读书|