go问题

执行go fmt或go build命令报错:

Get “https://proxy.golang.org/github.com/gin-gonic/gin/@v/list”: dial tcp 172.217.160.81:443: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond

解决:执行命令:

go env -w GOPROXY=https://goproxy.cn

go安装gin框架

1,把下载好的包放到go安装目录src下
2.代码:test.go  运行>go run test.go
package main

import "github.com/gin-gonic/gin"

func main() {
   r := gin.Default()
   r.GET("/ping", func(c *gin.Context) {
      c.JSON(200, gin.H{
         "message": "pong",
      })
   })
   r.Run() // listen and serve on 0.0.0.0:8080
}
3.浏览器输入http://127.0.0.1:8080/ping

显示:

{"message":"pong"}

linux超线程

超线程是英特尔开发出来的一项技术,使得单个处理器可以象两个逻辑处理器那样运行,这样单个处理器以并行执行线程。

非超线程:cpu线程数=物理CPU个数*每个物理CPU的逻辑核数*1

超线程:cpu线程数=物理CPU个数*每个物理CPU的逻辑核数*2

物理CPU个数:[root@localhost ~]# cat /proc/cpuinfo | grep “physical id” | sort | uniq

每个物理CPU的逻辑核数:[root@localhost ~]# cat /proc/cpuinfo | grep “cores” | uniq

系统整个cpu线程数: cat /proc/cpuinfo | grep “processor” | wc -l

查看内核信息:[root@localhost ~]# cat /proc/cpuinfo | grep “model name” | uniq

根据physical id信息可以判断哪些逻辑核在同一个物理核上,因为同一物理核上的逻辑核的physical id相等;而根据core id信息又可以判断哪两个cpu线程跑在同一个逻辑核上,因为跑在同一逻辑核上的cpu线程的core id相等,但是,由于不同物理核上的逻辑核core id可以相等,所以在进行第二个判断(即哪两个cpu线程跑在同一个逻辑核上)前需要先进行第一个判断(即哪些逻辑核在同一个物理核上)

go环境搭建

安装GO

1.下载安装包go1.15.windows-amd64.msi点击安装到自定义目录

2.配置:新加环境系统变量

PATH  D:\desktoptool\go\bin  //可执行文件目录

GOPATH  D:\desktoptool\gocode //代码目录

GOROOT  D:\desktoptool\go  //安装目录

3.cmd窗口输入下面命令查看:

D:\desktoptool\gocode>go version
go version go1.15 windows/amd64

D:\desktoptool\gocode>go env
set GO111MODULE=
set GOARCH=amd64
set GOBIN=
set GOCACHE=C:\Users\14065\AppData\Local\go-build
set GOENV=C:\Users\14065\AppData\Roaming\go\env
set GOEXE=.exe
set GOFLAGS=
set GOHOSTARCH=amd64
set GOHOSTOS=windows
set GOINSECURE=
set GOMODCACHE=C:\Users\14065\go\pkg\mod
set GONOPROXY=
set GONOSUMDB=
set GOOS=windows
set GOPATH=C:\Users\14065\go
set GOPRIVATE=
set GOPROXY=https://proxy.golang.org,direct
set GOROOT=D:\desktoptool\go
set GOSUMDB=sum.golang.org
set GOTMPDIR=
set GOTOOLDIR=D:\desktoptool\go\pkg\tool\windows_amd64
set GCCGO=gccgo
set AR=ar
set CC=gcc
set CXX=g++
set CGO_ENABLED=1
set GOMOD=
set CGO_CFLAGS=-g -O2
set CGO_CPPFLAGS=
set CGO_CXXFLAGS=-g -O2
set CGO_FFLAGS=-g -O2
set CGO_LDFLAGS=-g -O2
set PKG_CONFIG=pkg-config
set GOGCCFLAGS=-m64 -mthreads -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=C:\Users\14065\AppData\Local\Temp\go-build611408406=/tmp/go-build -gno-record-gcc-switches

安装IDE goland

 

 

 

 

 

kafka分区

首先我们来看下官网的图示,kafka分区的作用个人觉得就是提供一种负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。

如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()close(),通常你只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

		int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  • 这里的topickeykeyBytesvaluevalueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。虽说可以有无数种分区的可能,但比较常见的分区策略也就那么几种,下面我来详细介绍一下。
轮询策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。

在这里插入图片描述

这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。

轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

随机策略

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上,如下面这张图所示。

在这里插入图片描述

如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:

		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		return ThreadLocalRandom.current().nextInt(partitions.size());

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

按消息key保存策略

也称 Key-ordering 策略。这个可以理解为是自定义的策略之一。

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

在这里插入图片描述

实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:

		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		return Math.abs(key.hashCode()) % partitions.size();

前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息Key保序策略;如果没有指定 Key,则使用轮询策略。

其他分区策略

上面这几种分区策略都是比较基础的策略,除此之外你还能想到哪些有实际用途的分区策略?其实还有一种比较常见的,即所谓的基于地理位置的分区策略。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群

假设有个厂商所有服务都部署在北京的一个机房(这里我假设它是自建机房,不考虑公有云方案。其实即使是公有云,实现逻辑也差不多),现在该厂商考虑在南方找个城市(比如深圳)再创建一个机房;另外从两个机房中选取一部分机器共同组成一个大的 Kafka 集群。显然,这个集群中必然有一部分机器在北京,另外一部分机器在深圳。

假设该厂商计划为每个新注册用户提供一份注册礼品,比如南方的用户注册的可以免费得到一碗“甜豆腐脑”,而北方的新注册用户可以得到一碗“咸豆腐脑”。如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理南北方注册用户逻辑即可。

但问题是你需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房中,因为处理这些消息的消费者程序只可能在某一个机房中启动着。换句话说,送甜豆腐脑的消费者程序只在深圳机房启动着,而送咸豆腐脑的程序只在北京的机房中,如果你向深圳机房中的 Broker 发送北方注册用户的消息,那么这个用户将无法得到礼品!

此时我们就可以根据 Broker 所在的 IP 地址实现定制化的分区策略。比如下面这段代码:

		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

我们可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。

切记分区是实现负载均衡以及高吞吐量的关键,所以一定要在生产者这一端就要考虑好合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,从而导致下游数据消费的性能下降的问题

rabbitmq消息确认和持久化

消息确认:

消费消息确认模式有:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认
  • 1、可以自动创建队列,也可以手动创建队列,如果自动创建队列,那么是谁负责创建队列呢?是生产者?还是消费者?
    如果队列不存在,当然消费者不会收到任何的消息。但是如果队列不存在,那么生产者发送的消息就会丢失。所以,为了数据不丢失,消费者和生产者都可以创建队列。那么如果创建一个已经存在的队列呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是队列属性并不会改变。队列对于负载均衡的处理是完美的。对于多个消费者来说,RabbitMQ使用轮询的方式均衡的发送给不同的消费者。

    2、RabbitMQ的消息确认机制

    默认情况下,如果消息已经被某个消费者正确的接收到了,那么该消息就会被从队列中移除。当然也可以让同一个消息发送到很多的消费者。

    如果一个队列没有消费者,那么,如果这个队列有数据到达,那么这个数据会被缓存,不会被丢弃。当有消费者时,这个数据会被立即发送到这个消费者,这个数据被消费者正确收到时,这个数据就被从队列中删除。

    那么什么是正确收到呢?通过ack。每个消息都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:

    RabbitMQ Server会把这个信息发送到下一个消费者。

    如果这个app有bug,忘记了ack,那么RabbitMQServer不会再发送数据给它,因为Server认为这个消费者处理能力有限。

    而且ack的机制可以起到限流的作用(Benefitto throttling):在消费者处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的均衡消费者的负载。

    消息持久化:

rabbitmq生产与消费php代码剖析

RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种

fanout(广播)

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,转发消息是最快的。

direct(定向)

direct类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。

topic(通配符)

前面讲到direct类型的交换器路由规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里的匹配规则有些不同,它约定:

(一)RoutingKey为一个点好“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如“com.rabbitmq.client”、“com.hidden.client”。

(二)BindingKey和RoutingKey一样也是点号“.”分隔的字符串。

(三)BindingKey中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配零个或多个单词。

headers

headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在

匹配规则x-match有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

代码示例send.php

<?php

require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];

//创建连接和channel
$conn = new AMQPConnection($conn);
if(!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

// 用来绑定交换机和队列
$routingKey = 'key_1';

$ex = new AMQPExchange($channel);
//  交换机名称
$exchangeName = 'ex1';
$ex->setName($exchangeName);

// 设置交换机类型
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 设置交换机是否持久化消息
$ex->setFlags(AMQP_DURABLE);
$ex->declare();

for($i=0; $i<5; ++$i){
    echo "Send Message:".$ex->publish(date('H:i:s')."用户".$i."注册" , $routingKey )."\n";
}

consume.php 创建队列,阻塞状态消费信息

<?php
/**
 * Des 描述
 * Date 2020/8/24
 * Time  10:32
 * User ycl
 */
require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];
try {

//创建连接和channel
    $conn = new AMQPConnection($conn);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);
    $exchangeName = 'ex1';

//创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($exchangeName);

    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $ex->setFlags(AMQP_DURABLE); //持久化交换机
    $ex->declare();

//  创建队列
    $queueName = 'queue1';
    $q = new AMQPQueue($channel);
    $q->setName($queueName);
    $q->setFlags(AMQP_DURABLE);//持久化队列
    $q->declareQueue();

// 用于绑定队列和交换机,跟 send.php 中的一致。
    $routingKey = 'key_1';
    $q->bind($exchangeName, $routingKey);

//接收消息
    $q->consume(function ($envelope, $queue) {
        $msg = $envelope->getBody();
        echo $msg . "\n"; //处理消息
    }, AMQP_AUTOACK);//消息自动确认

    $conn->disconnect();
}catch (AMQPException $e){
    echo $e->getMessage();
}

dockrer安装RabbitMq

RabbitMQ 常用的交换器类型有 fanout direct topic headers 这四种

   1  docker pull rabbitmq
   2  进入docker配置页面  docker exec -it 0e6840bd6241 /bin/bash
       执行安装客户端插件:rabbitmq-plugins enable rabbitmq_management
   3  运行容器: 5672用于php链接,15672用于客户端链接 
    docker run -itd --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq

测试:

浏览器输入:http://192.168.222.129:15672     默认用户名guest密码guest

php测试rabbitmq

composer安装扩展

"require": {
    "php-amqplib/php-amqplib": ">=2.6.1"
}

新建send.php 端口号为5672

<?php
/**
 * Des 描述
 * Date 2020/8/24
 * Time  10:31
 * User ycl
 */
require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];

//创建连接和channel
$conn = new AMQPConnection($conn);
if(!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

// 用来绑定交换机和队列
$routingKey = 'key_1';

$ex = new AMQPExchange($channel);
//  交换机名称
$exchangeName = 'ex1';
$ex->setName($exchangeName);

// 设置交换机类型
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 设置交换机是否持久化消息
$ex->setFlags(AMQP_DURABLE);
$ex->declare();

for($i=0; $i<5; ++$i){
    echo "Send Message:".$ex->publish(date('H:i:s')."用户".$i."注册" , $routingKey )."\n";
}

新建consume.php 端口号为5672

<?php
/**
 * Des 描述
 * Date 2020/8/24
 * Time  10:32
 * User ycl
 */
require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];
try {

//创建连接和channel
    $conn = new AMQPConnection($conn);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);
    $exchangeName = 'ex1';

//创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($exchangeName);

    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $ex->setFlags(AMQP_DURABLE); //持久化
    $ex->declare();

//  创建队列
    $queueName = 'queue1';
    $q = new AMQPQueue($channel);
    $q->setName($queueName);
    $q->setFlags(AMQP_DURABLE);
    $q->declareQueue();

// 用于绑定队列和交换机,跟 send.php 中的一致。
    $routingKey = 'key_1';
    $q->bind($exchangeName, $routingKey);

//接收消息
    $q->consume(function ($envelope, $queue) {
        $msg = $envelope->getBody();
        echo $msg . "\n"; //处理消息
    }, AMQP_AUTOACK);

    $conn->disconnect();
}catch (AMQPException $e){
    echo $e->getMessage();
}

执行

先执行 php consume.php

新开窗口执行 php send.php

结果 consume.php执行窗口会有输出 成功

docker下使用redis配置文件启动

下载redis.conf文件到本地目录

/usr/local/docker/redis

修改 vim redis.conf

bind 127.0.0.1 #注释掉这部分,这是限制redis只能本地访问

protected-mode no #默认yes,开启保护模式,限制为本地访问

daemonize no#默认no,改为yes意为以守护进程方式启动,可后台运行,除非kill进程,改为yes会使配置文件方式启动redis失败

databases 16 #数据库个数(可选),我修改了这个只是查看是否生效。。

dir  ./ #输入本地redis数据库存放文件夹(可选)

appendonly yes #redis持久化(可选)

映射到docker,启动redis

[root@localhost /]# docker run -p 6379:6379 --name redis6 -v /usr/local/docker/redis/redis.conf:/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data -d redis redis-server /etc/redis/redis.conf --appendonly yes

说明:
--appendonly yes 开启redis 持久化

 

docker安装应用

1,root账户登录,查看内核版本如下

 [root@localhost ~]# uname -a

2,把yum包更新到最新

 [root@localhost ~]# yum update
3,安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
[root@localhost ~]# yum install -y yum-utils device-mapper-persistent-data lvm2
4,设置yum源
 [root@localhost ~]# yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

5,可以查看所有仓库中所有docker版本,并选择特定版本安装

[root@localhost ~]# yum list docker-ce --showduplicates | sort -r

6,安装Docker,命令:yum install docker-ce-版本号,在这一步可能会报错

错误信息:问题: package docker-ce-3:19.03.12-3.el7.x86_64 requires containerd.io >= 1.2.2-3, but none of the providers can be installed
可以换一个版本试试
[root@localhost ~]# yum install docker-ce-17.12.1.ce
或者换成其他版本
[root@localhost ~]# yum install -y docker-ce-18.06.3.ce-3.el7 -q
7, 启动Docker,命令:systemctl start docker,然后加入开机启动,如下
[root@localhost ~]# systemctl start docker

8,验证安装是否成功(有client和service两部分表示docker安装启动都成功了

[root@localhost ~]# docker version

继续阅读“docker安装应用”