Menu
Woocommerce Menu

(文/开源中国)    ,距离Python 3522vip靠谱吗3.8.1 rc1发布没多久的时间

0 Comment


Core and Builtins

bpo-38811:修复缺少os.link()时pathlib中未处理的异常。

    ZOO_LOG4J_PROP=”INFO,ROLLINGFILE

代码实现

package com.nsfocus.bsa.example;

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

/**
 * Checkpoint example
 *
 * @author Shuai YUAN
 * @date 2015/10/27
 */
public class CheckpointTest {

    private static String CHECKPOINT_DIR = "/checkpoint";

    public static void main(String[] args) {

        // get javaStreamingContext from checkpoint dir or create from sparkconf
        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHECKPOINT_DIR, new JavaStreamingContextFactory() {
            public JavaStreamingContext create() {
                return createContext();
            }
        });

        jssc.start();
        jssc.awaitTermination();

    }

    public static JavaStreamingContext createContext() {

        SparkConf sparkConf = new SparkConf().setAppName("tachyon-test-consumer");

        Set<String> topicSet = new HashSet<String>();
        topicSet.add("test_topic");

        HashMap<String, String> kafkaParam = new HashMap<String, String>();
        kafkaParam.put("metadata.broker.list", "test1:9092,test2:9092");

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

        // do checkpoint metadata to hdfs
        jssc.checkpoint(CHECKPOINT_DIR);

        JavaPairInputDStream<String, String> message =
                KafkaUtils.createDirectStream(
                        jssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParam,
                        topicSet
                );

        JavaDStream<String> valueDStream = message.map(new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String, String> v1) throws Exception {
                return v1._2();
            }
        });
        valueDStream.count().print();

        return jssc;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • bpo-39080:当星标表达式 AST
    节点位于 Call AST 节点的 args 属性中的元素之中时,修复它们的
    end_col_offset 值。

  • bpo-39031:解析“elif”节点时,该节点的
    lineno 和 col_offset
    现在指向“elif”关键字,而不是其条件,从而使其与“if”节点一致。

  • bpo-39008:PySys_Audit()现在要求Py_ssize_t将格式字符串中的
    size 参数用作大小参数,而不管 PY_SSIZE_T_CLEAN在包括时定义了什么。

bpo-38546:Multiprocessing和current.futures测试现在会在测试完成时停止资源跟踪器进程。


次集群单点发生故障恢复后,都需要进行重新选举才能彻底恢复集群的leader分配,如果嫌每次这样做很麻烦,可以在broker的配置文件(即
server.properties)中配置auto.leader.rebalance.enable=true,这样broker在启动后就会自动进
行重新选举

源码实现

scala的实现网上很容易搜到,这里贴个java实现的代码。

package com.xueba207.test;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConversions;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
 * KafkaOffsetExample
 *
 * @author Shuai YUAN
 * @date 2015/10/28
 */
public class KafkaOffsetExample {

    private static KafkaCluster kafkaCluster = null;

    private static HashMap<String, String> kafkaParam = new HashMap<String, String>();

    private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null;

    private static scala.collection.immutable.Set<String> immutableTopics = null;

    public static void main(String[] args) {

        SparkConf sparkConf = new SparkConf().setAppName("tachyon-test-consumer");

        Set<String> topicSet = new HashSet<String>();
        topicSet.add("test_topic");


        kafkaParam.put("metadata.broker.list", "test:9092");
        kafkaParam.put("group.id", "com.xueba207.test");

        // transform java Map to scala immutable.map
        scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
        scala.collection.immutable.Map<String, String> scalaKafkaParam =
                testMap.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
                    public Tuple2<String, String> apply(Tuple2<String, String> v1) {
                        return v1;
                    }
                });

        // init KafkaCluster
        kafkaCluster = new KafkaCluster(scalaKafkaParam);

        scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
        immutableTopics = mutableTopics.toSet();
        scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get();

        // kafka direct stream 初始化时使用的offset数据
        Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();

        // 没有保存offset时(该group首次消费时), 各个partition offset 默认为0
        if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) {

            System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get());

            Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet2);

            for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
                consumerOffsetsLong.put(topicAndPartition, 0L);
            }

        }
        // offset已存在, 使用保存的offset
        else {

            scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("com.nsfocus.bsa.ys.test", topicAndPartitionSet2).right().get();

            Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);

            Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet2);

            for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
                Long offset = (Long)consumerOffsets.get(topicAndPartition);
                consumerOffsetsLong.put(topicAndPartition, offset);
            }

        }

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
        kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);

        // create direct stream
        JavaInputDStream<String> message = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                String.class,
                kafkaParam,
                consumerOffsetsLong,
                new Function<MessageAndMetadata<String, String>, String>() {
                    public String call(MessageAndMetadata<String, String> v1) throws Exception {
                        return v1.message();
                    }
                }
        );

        // 得到rdd各个分区对应的offset, 并保存在offsetRanges中
        final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
        JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
            public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                offsetRanges.set(offsets);
                return rdd;
            }
        });

        // output
        javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> v1) throws Exception {
                if (v1.isEmpty()) return null;

                //处理rdd数据,这里保存数据为hdfs的parquet文件
                HiveContext hiveContext = SQLContextSingleton.getHiveContextInstance(v1.context());
                DataFrame df = hiveContext.jsonRDD(v1);
                df.save("/offset/test", "parquet", SaveMode.Append);


                for (OffsetRange o : offsetRanges.get()) {

                    // 封装topic.partition 与 offset对应关系 java Map
                    TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
                    Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
                    topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());

                    // 转换java map to scala immutable.map
                    scala.collection.mutable.Map<TopicAndPartition, Object> testMap =
                            JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap);
                    scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap =
                            testMap.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
                                public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
                                    return v1;
                                }
                            });

                    // 更新offset到kafkaCluster
                    kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap);

//                    System.out.println(
//                            o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
//                    );
                }
                return null;
            }
        });

        jssc.start();
        jssc.awaitTermination();
    }

}

macOS

bpo-36406:处理doctest中的namespace packages。

    跑一下,输出是这样的:

保存起来便于查阅

Tests

距离Python 3.8.1 rc1发布没多久的时间,目前,Python
3.8.1也已正式发布。Python 3.8.1是Python 3.8的第一个维护版本,Python
3.8系列是Python编程语言的最新主要版本,它包含了许多新功能和优化。

 

流程图

Startcheckpoint存在?从checkpoint得到sparkStreamingContextcheckpoint
sparkStreamingContext数据到hdfs/tachyon读取数据启动task,处理数据End新建sparkStreamingContextyesno

  • bpo-38295:防止在 macOS
    Catalina 上的 test_py_compile 中的 test_relative_path 失败。

IDLE

1) 改造一下KafkaUtil类,加入Consumer client的构造。

自己实现保存offset到zk

开发者可以自己开发保存offset到zk的实现逻辑。spark streaming
的rdd可以被转换为HasOffsetRanges类型,进而得到所有partition的offset。

  • bpo-39007:将 auditing events
    添加到winreg中的函数。

Tests

   
配置项的详细说明请见官方文档:

实现流程

start初始化kafka连接参数初始化kafka
cluster对象利用kafka连接参数得到offsets集合出现异常?设置offsets为0初始化sparkStreamingContext初始化Kafka对应的DStream得到DStream中rdd对应的offsets处理数据…更新offset到kafka
clusterendyesno

IDLE

bpo-39008:PySys_Audit()现在要求Py_ssize_t将格式字符串中的size参数用作大小参数,而不管PY_SSIZE_T_CLEAN在包括时定义了什么。

 

问题描述

最近使用spark
streaming处理kafka的数据,业务数据量比较大,就使用了kafkaUtils的createDirectStream()方式,此方法直接从kafka的broker的分区中读取数据,跳过了zookeeper,并且没有receiver,是spark的task直接对接kakfa
topic
partition,能保证消息恰好一次语意,但是此种方式因为没有经过zk,topic的offset也就没有保存,当job重启后只能从最新的offset开始消费消息,造成重启过程中的消息丢失。

Library

bpo-38295:防止在macOS
Catalina上的test_py_compile中的test_relative_path失败。

 

转自:

Windows

bpo-39031:解析“elif”节点时,该节点的lineno和col_offset现在指向“elif”关键字,而不是其条件,从而使其与“if”节点一致。

 

checkpoint机制

spark streaming job 可以通过checkpoint
的方式保存job执行断点,断点中有spark streaming
context中的全部信息(包括有kakfa每个topic
partition的offset)。checkpoint有两种方式,一个是checkpoint
数据和metadata,另一个只checkpoint
metadata,一般情况只保存metadata即可,因此这里只介绍checkpoint
metadata。

Python 3.8.1 版本的更新内容如下:

bpo-38943:修复IDLE自动完成窗口不总是出现在某些系统上的问题

经过debug发现,连接到集群是成功的,但连接到集群后更新回来的集群meta信息却是错误的:3522vip靠谱吗 1 
能够看到,metadata中的Cluster信息,节点的hostname是iZ25wuzqk91Z这样的一串数字,而不是实际的ip地址
10.0.0.100和101。iZ25wuzqk91Z其实是远端主机的hostname,这说明在没有配置advertised.host.name
的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置
hosts,所以自然是连接不上这个hostname的。要解决这一问题,把host.name和advertised.host.name都配置成绝对
的ip地址就可以了。

解决方案

一般,有两种方式可以先spark streaming 保存offset:spark
checkpoint机制和程序中自己实现保存offset逻辑,下面分别介绍。

  • bpo-38944:Excape
    key 现在会关闭 IDLE completion windows

  • bpo-38943:修复 IDLE
    自动完成窗口不总是出现在某些系统上的问题

Library

1) 查看当前Topic的状态:

详细信息:

Core and Builtins

2) 启动Producer向Kafka集群发送消息

  • bpo-39022:更新
    importliib.metadata 以包括对 importlib_metadata 1.3
    的改进,包括更好的 EntryPoints 序列化和改进的自定义查找器文档。

  • bpo-38811:修复缺少os.link()pathlib中未处理的异常。

  • bpo-36406:处理doctest中的
    namespace packages。

Python 3.8.1版本的更新内容如下:

  • 可自由控制Worker线程的数量,不受Partition数量限制

(文/开源中国)    

bpo-39007:将auditing events添加到winreg中的函数。

[kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.common.network.Selector – Connection with /10.0.0.100
disconnected 
java.net.ConnectException: Connection refused: no further information 
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
    at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
    at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54) 
    at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72) 
    at
org.apache.kafka.common.network.Selector.poll(Selector.java:274) 
    at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 
    at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 
    at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) 
    at java.lang.Thread.run(Thread.java:745)
[kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.Metadata – Updated cluster metadata version 7
to Cluster(nodes = [Node(1, 10.0.0.101, 9092)], partitions =
[Partition(topic = test, partition = 1, leader = 1, replicas = [1,],
isr = [1,], Partition(topic = test, partition = 0, leader = 1,
replicas = [1,], isr = [1,]])

  • bpo-38546:Multiprocessing 和
    current.futures 测试现在会在测试完成时停止资源跟踪器进程。

bpo-38944:Excape key现在会关闭IDLE completion windows

    接下来,我们创建一个名为test,拥有两个分区,两个副本的Topic:

macOS

 

bpo-39080:当星标表达式AST节点位于Call
AST节点的args属性中的元素之中时,修复它们的end_col_offset值。

 

Windows

 
在调用KafkaProducer的send方法时,可以注册一个回调方法,在Producer端完成发送后会触发回调逻辑,在回调方法的
metadata对象中,我们能够获取到已发送消息的offset和落在的分区等信息。注意,如果acks配置为0,依然会触发回调逻辑,只是拿不到
offset和消息落地的分区信息。

3522vip靠谱吗 2

    至此,zookeeper集群的安装和高可用性验证完成。

bpo-39022:更新importliib.metadata以包括对importlib_metadata
1.3的改进,包括更好的EntryPoints序列化和改进的自定义查找器文档。

 

弊端:

 

message send to partition 0, offset: 28 
message send to partition 1, offset: 26 
message send to partition 0, offset: 29 
message send to partition 1, offset: 27 
message send to partition 1, offset: 28 
message send to partition 0, offset: 30 
message send to partition 0, offset: 31 
message send to partition 1, offset: 29 
message send to partition 1, offset: 30 
message send to partition 1, offset: 31 
message send to partition 0, offset: 32 
message send to partition 0, offset: 33 
message send to partition 0, offset: 34 
message send to partition 1, offset: 32

 

tar -xzvf kafka_2.11-0.9.0.1.tgz

    然后重启zookeeper,就ok了

  public static void main(String[] args) throws Exception{
        KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
        String topic = "test";
        TopicPartition partition0 = new TopicPartition(topic, 0);
        TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0, partition1));
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
            }
            consumer.commitSync();
        }
    }
bin/zkServer.sh status

    假设目前3个zk节点中,server0为leader,server1和server2为follower

 

 

 
原因是Producer端在尝试向broker1的parition0发送消息时,partition0的leader已经切换成了broker0,所以消息发送失败。

 

 
全部的Consumer配置见官方文档:

public class KafkaTest {
    public static void main(String[] args) throws Exception{
        Producer<String, String> producer = KafkaUtil.getProducer();
        int i = 0;
        while(true) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null)
                        e.printStackTrace();
                    System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });
            i++;
            Thread.sleep(1000);
        }
    }
}

 
我们看到,broker0启动起来了,并且已经是in-sync状态(注意Isr从1变成了1,0),但此时两个partition的leader还都是
broker1,也就是说当前broker1会承载所有的发送和拉取请求。这显然是不行的,我们要让集群恢复到负载均衡的状态。

   
接下来,我们在另一台主机也完成Kafka的安装和配置,然后在两台主机上分别启动Kafka:

  再次查看Topic状态:

  可以看到,集群重新回到了broker0挂掉之前的状态

  • bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
  • acks:broker消息确认的模式,有三种:
    0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
    1:由Leader确认,Leader接收到消息后会立即返回确认信息
    all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
    我们可以根据消息的重要程度,设置不同的确认模式。默认为1
  • retries:发送失败时Producer端的重试次数,默认为0
  • batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都DuLi发送。默认为16384字节
  • linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
  • key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定
  • buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)

 

 
如上,我们也通过测试证实了Kafka集群出现单点故障时,Consumer端的功能正确性。

  Producer端的常用配置

1. 安装Java环境

    此处的坑:

    创建完成后,使用如下命令查看Topic状态:

 

 
能看到Producer端的DEBUG日志显示与broker0的链接断开了,此时Kafka立刻开始更新集群metadata,更新后的metadata表示broker1现在是两个partition的leader,Producer进程很快就恢复继续运行,没有漏发任何消息,能够看出Kafka集群的故障切换机制还是很厉害的

  输出:

    此处选用Zookeeper的版本是3.4.6,此为Kafka0.9中推荐的Zookeeper版本。

 

org.apache.kafka.common.errors.NotLeaderForPartitionException: This
server is not the leader for that topic-partition.

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

相关文章

网站地图xml地图