Contents

Python-第三方库-Scapy

PYPI官网

scapy官网

scapy官网文档

scapy官网api

scapy官方api常用文档,scapy.layers packagescapy.layers.inetscapy.sendrecv

CSDN 迅鹿 libpcap使用整理

CSDN micromicrofat libpcap、struct、dpkt、scapy、pyshark五种方式获取pcap原始包的速度对比

通用文档

介绍

Scapy 是一个 Python 程序,它使用户能够发送、嗅探、剖析和伪造网络数据包。此功能允许构建可以探测、扫描或攻击网络的工具。

Scapy主要做两件事:发送数据包和接收应答。和go的gopackage的功能是类似的。

https://scapy.readthedocs.io/en/latest/_images/scapy-concept.png
运行机制

下载安装

类型 包含 命令
默认 只有 Scapy pip install scapy
基本的 Scapy 和 IPython。强烈推荐 pip install --pre scapy[basic]
完全的 Scapy 及其所有主要依赖项 pip install --pre scapy[complete]

用法

启动 Scapy

1
scapy

脚本

1
from scapy.all import *

变量

  • conf.use_pcap:提供原生套接字和libpcap套接字,默认使用原生套接字,设为true将socket指向conf.L2socket和conf.L3socket

常用函数

  • rdpcap(pwd):读取pwd pcap文件,可以是绝对路径和相对路径,返回PacketList
  • pdfdump(pwd,layer_shift=1):图形转储为pwd文件,可以是绝对路径和相对路径,pwd为None时为临时文件,需要安装PyX和TexLive
  • wrpcap(“temp.cap”,pkts):将pkts存储为temp.cap
  • save_session():保存所有python会话变量
  • load_session():加载保存的python会话变量
  • arp_mitm(“192.168.122.156”, “192.168.122.17”):实施arp中间人攻击,需要设置sysctl net.ipv4.conf.virbr0.send_redirects=0 # virbr0 = interface;sysctl net.ipv4.ip_forward=1
  • lsc():可用函数
  • ls():可用层列表
  • explore():显示现有图层的图形界面

  • send():发送第 3 层数据包,loop参数选择发送次数
  • sendp():发送第 2 层数据包,可以配置网卡接口和链路层协议,iface参数选择网卡接口,inter参数设置数据包发送间隔
  • sr():函数用于发送 3 层数据包和接收响应,该函数返回(ans SndRcvList,unans PacketList),ans是被响应的(数据包,响应),unans是未响应数据包集,retry 指定重新发送未应答的数据包最大次数,timeout参数指定发送完最后一个数据包后等待的时间。threaded开启新线程来发送(开了时间优化),filter参数使用BPF过滤器(注意BPF过滤器的语法ip指的是含有ip头,那么icmp包,udp包或者http包等应用层数据包也能被嗅探到)所有的具有接收功能的函数都具有这个参数。
  • sr1():返回一个数据包,该数据包响应了发送的 3 层数据包(或数据包集),
  • srp():对第 2 层数据包(以太网、802.3 等)执行sr函数相同的操作,返回值类型也相同
  • srloop():循环发送和接收包

  • fuzz():生成数据包对象,除了你给出的参数和需要计算的数据包字段如校验和,其他字段都是随机的。注意:IP 层 src 和 dst 参数不能使用fuzz()随机而是使用 RandIP()
  • RawVal():注入一些不适合该类型的值,pkt = IP(len=RawVal(b"NotAnInteger"), src="127.0.0.1")

  • sniff():嗅探网卡接口,filter参数设置操作系统的 BPF 过滤器,count设置嗅探数据包个数,iface设置选择的网卡, prn设置处理函数,session参数可以对同一session的碎片数据包进行整理后处理,这里有一个特殊参数offline用于读取pcap文件,lfilter参数的值是一个函数用于再prn参数和filter参数之间进行一个过滤

freeCodeCamp.org 如何使用 Scapy——Python 网络工具详解

https://www.freecodecamp.org/news/content/images/2022/12/image-97.png

  • load_module(“p0f”):载入其他程序,比如p0f
  • p0f(p):调用p0f验证发送数据包p机器的操作系统类型

  • hexdump(pkt,True):将层转换为十六进制串
  • import_hexcap(a):将十六进制串转换为Raw二进制字符串
  • raw(pkt):将层转换为Raw二进制字符串
  • export_object(pkt):将层转换为Base64编码
  • import_object(a):将Base64编码转换为Raw二进制字符串

  • in4_chksum(socket.IPPROTO_UDP, packet[IP], udp_raw):计算checksum,packet[IP]为IP头,udp_raw为udp包

packet

Packet

数据包类

变量

  • payload:负载
  • name:名字

常用方法

  • add_payload():添加负载
  • copy():深拷贝该数据包实例
  • sprintf(format[, relax=1])→ str:格式化输出数据包信息,p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
  • lastlayer(layer: Packet | None = None)→ Packet:返回最上层
  • layers()→ List[Type[Packet]:返回该数据包具有的层类型组成的列表
  • remove_payload()→ None:移除负载
  • remove_underlayer(other: Packet)→ None:移除层
  • show():数据包的开发视图
  • show2():在show的基础上加上校验和
  • decode_payload_as():更改有效负载的解码方式
  • getlayer(cls: int | Type[Packet] | str, nb: int = 1, _track: List[int] | None = None, _subclass: bool | None = None, **flt: Any)→ Packet | None:获取某层,如果没有返回None

运算符

  • []:取数据包的层
  • /运算符组合两层或字符串

Raw

Packet的子类

二进制串

layer

layer指的是Packet的一系列子类,比如IP,TCP等

layer类官方文档

layer.inet官方文档

属性

属性通过.运算符访问

运算符

1
2
IP()/TCP()/"GET / HTTP/1.0\r\n\r\n"
Ether()/IP()/IP()/UDP()
  • del(a.ttl)内置函数删除层属性配置
  • =进行赋值,层之间可以任意赋值,但是只会解析该类型层的属性,其余不能解析的都作为raw对象放到最后面

常用函数

  • raw(pkt):将层转换为Raw二进制字符串
  • hexdump(pkt):十六进制转储
  • ls(pkt):字段值列表

常用方法

  • hide_defaults():不显示与默认值相同的字段
  • summary():一行总结
  • psdump():绘制带有解释的 PostScript 图
  • pdfdump():绘制带有解释的 PDF
  • command():返回可以生成该数据包的 Scapy 命令

PacketList

属性设置为元组,列表或者特殊字符串的layer表示多个layer,可以用他们构造PacketList对象,这样可以对他们进行类似列表操作

元组表示属性范围,列表表示单个属性,可以进行组合,ip字符串可以添加-符号来表示一段ip比如192.168.1.1-254

可以将数据包集转换到PacketList对象中,该对象提供一些列表操作

可以进行拆解遍历

1
2
3
4
p = PacketList(a)
p = PacketList([p for p in a/c]) # 原文中这里的a是IP层集,c是TCP层集
for snd,rcv in p:
    ...

运算符

  • +:将结果加起来

常用方法:

  • summary():显示每个数据包的摘要列表,可以使用lambda表达式,比如ans.summary( lambda s,r: r.sprintf("%TCP.sport% \t %TCP.flags%") )
  • nsummary():带有数据包编号
  • conversations():显示对话图
  • show():显示首选表示
  • filter():返回使用 lambda 函数过滤的数据包列表
  • hexdump():返回所有数据包的 hexdump
  • hexraw():返回所有数据包Raw二进制字符串的 hexdump
  • padding():返回带有填充的数据包的 hexdump
  • nzpadding():返回具有非零填充的数据包的 hexdump
  • plot():绘制应用于数据包列表的 lambda 函数
  • make_table():根据 lambda 函数显示表格,lambda表达式接收p为层,返回第一个参数是列名,第二个参数是行名,第三个参数是单元格中的名称

语法

1
2
for p in packetlist:
  ...

SndRcvList

其实就是QueryAnswer的列表

可以进行拆解遍历

1
2
for snd,rcv in ans:
    ...

变量

  • res:QueryAnswer类型的列表

运算符

  • +:将结果加起来

常用方法

  • make_table():根据 lambda 函数显示表格,lambda表达式接收s和r作为发送数据包和响应数据包,返回第一个参数是列名,第二个参数是行名,第三个参数是单元格中的名称
1
2
3
ans.make_table(
   lambda s,r: (s.dst, s.dport,
   r.sprintf("{TCP:%TCP.flags%}{ICMP:%IP.src% - %ICMP.type%}")))
  • PacketList.plot():绘制应用于数据包列表的 lambda 函数
1
a.plot(lambda x:x[1].id)

语法

1
2
for s,r in sndrcvList:
  ...

TracerouteResult

并行发送包,返回和sr类似的结果,因为TracerouteResult继承自SndRcvList,可以将SndRcvList的res属性转换为TracerouteResult:a = TracerouteResult(a.res),这个可以在traceroute函数里面看见,另外SndRcvList具有的属性和方法它也有

变量

  • res:QueryAnswer类型的列表

常用函数

  • TracerouteResult(SndRcvList.res):将SndRcvList类型对象转换为TracerouteResult对象
  • traceroute():对目标ip进行普通基于icmp不可达报文的transroute,maxttl参数设置最大ttl,minttl参数设置最小ttl,l4参数可以添加第四层负载

运算符

  • +:将结果加起来

常用方法

  • graph():制作有向图,根据AS进行聚类,需要安装graphviz,非常好用,target设置保存位置支持相对路径和绝对路径,不设置target表示作为临时文件打开
  • trace3D():安装了 VPython们可以对traceroute的结果对象进行3D表示
  • SndRcvList的方法

语法

1
2
for s,r in tracerouteresult:
  ...

ARPingResult

继承自SndRcvList,和TracerouteResult一样,可以通过a = ARPingResult(a.res)来新建ARPingResult对象

函数

  • arping(dstnet):发送arp包扫描局域网主机,返回ARPingResult和未响应数据包 PacketList的元组

AsyncSniffer

异步嗅探允许以编程方式停止嗅探器而不是使用 ctrl^C,它使用start(),stop()和join()方法

常用函数:

  • AsyncSniffer(iface=“enp0s3”, count=200):构造函数,iface参数选择嗅探网卡接口,count设置嗅探数据包数目,也可以使用prn,store,filter等参数

常用方法:

  • start():开始嗅探
  • t.stop():停止嗅探并返回结果,结果为PacketList
  • t.join():将异步线程join到主线程
  • t.results:返回嗅探结果

route

Scapy 有自己的路由表 conf.route 可以修改它以不同于系统路由表的方式路由数据包

  • conf.route:ipv4路由表
  • conf.route6:ipv6路由表

官网文档

常用函数

  • getmacbyip(“10.0.0.1”):获取ip的mac值

常用方法

  • conf.route.delt(net=“0.0.0.0/0”,gw=“192.168.8.1”):删除路由条目
  • conf.route.add(net=“0.0.0.0/0”,gw=“192.168.8.254”):添加路由条目
  • conf.route.resync():重置路由表,使其和系统一致
  • conf.route.route(“192.168.xxx.xxx”):查询路由结果,返回(interface, outgoing_ip, gateway),可以通过"0.0.0.0"这个特殊地址查看默认网关

QueryAnswer

运算符

  • [0]:query数据包
  • [1]:answer数据包

网卡接口相关

变量

  • conf.ifaces:网卡接口列表,等价于IFACES
  • conf.iface:默认网卡接口,默认网卡并没有什么用,因为始终是通过路由表来选择网卡的

函数

scapy不提供通过ip查主机名的函数,因为socket标准库的gethostbyaddr(ip)函数已经实现了这个功能

  • get_if_hwaddr(conf.iface):获取网卡的mac地址,可以传入网卡对象,也可以传入网卡的name属性对应的字符串
  • get_if_addr(conf.iface):获取网卡的ip地址,可以传入网卡对象,也可以传入网卡的name属性对应的字符串

IFACES

方法

  • reload():重新加载网卡列表
  • dev_from_name():从名字获取网卡对象NetworkInterface
  • dev_from_networkname():从网卡代码获取网卡对象
  • dev_from_index():从网卡索引获取网卡对象

NetworkInterface

属性

  • name:网卡名称
  • description:网卡描述
  • network_name:网卡代码
  • index:网卡索引
  • ip:ip地址
  • mac:mac地址

方法

  • update():更新属性

Scapy.Config.Conf

class scapy.config.Conf

全局变量conf就是该类型的实例,该对象包含 Scapy 的配置

变量

  • AS_resolver: scapy.as_resolvers.AS_resolver= None

Scapy.as_resolvers.AS_resolver

classscapy.as_resolvers.AS_resolver(server: str | None = None, port: int = 43, options: str | None = None)[source]

高级用法

ASN.1 和 SNMP

ASN.1 指定数据交换的格式,与数据编码方式无关。数据编码在编码规则中指定,最常用的编码规则是 BER(Basic Encoding Rules基本编码规则)和 DER(Distinguished Encoding Rules区分编码规则),后者保证编码的唯一性

ASN.1 提供了基本对象:整数、多种字符串、浮点数、布尔值、容器等。它们放到 Universal 类中。给定的协议可以把其他对象放在 Context 类中。例如,SNMP 定义了 PDU_GET 或 PDU_SET 对象,还有 Application和Private类对象

每个对象都有一个标记,编码规则将使用这个标记。Universal类中:1是布尔值,2是整数,3是位字符串,6是 OID,48是序列。Context 类(继承Universal类)中的标记从0xa0开始,我们需要知道上下文才能解码。例如,在 SNMP 上下文中,0xa0是 PDU_GET 对象,而在 X509上下文中,0xa0是证书版本的容器

其他对象是通过组装所有这些基本的砖对象来创建的。组合是使用先前定义或现有对象的序列和数组(集合)完成的。最后一个对象(X509证书、 SNMP 包)是一棵树,其非叶节点是序列并设置对象(或派生的上下文对象) ,其叶节点是整数、字符串、 OID 等。

Scapy 和 ASN.1

Scapy 提供了一种方便地对 ASN.1进行编码或解码的方法,还可以对这些编码器/解码器进行编程。它比 ASN.1解析器要宽松得多,而且有点忽略了约束。它既不会替换 ASN.1解析器,也不会替换 ASN.1编译器。事实上,它被写成能够编码和解码破碎的 ASN.1。它可以处理已损坏的编码字符串,也可以创建这些字符串。

ASN.1 engine

注意: 这里提供的许多类定义都使用元类。如果你没有精确地看源代码,而你只依赖于我的捕捉,你可能会认为他们有时表现出一种神奇的行为。ScapyASN.1引擎提供了链接对象及其标记的类。它们继承自 ASN1_Class。第一个是 ASN1_Class_UNIVERSAL,它为大多数 Universal 对象提供标记。每个新上下文(SNMP,X509)将从它继承并添加自己的对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class ASN1_Class_UNIVERSAL(ASN1_Class):
    name = "UNIVERSAL"
# [...]
    BOOLEAN = 1
    INTEGER = 2
    BIT_STRING = 3
# [...]

class ASN1_Class_SNMP(ASN1_Class_UNIVERSAL):
    name="SNMP"
    PDU_GET = 0xa0
    PDU_NEXT = 0xa1
    PDU_RESPONSE = 0xa2

class ASN1_Class_X509(ASN1_Class_UNIVERSAL):
    name="X509"
    CONT0 = 0xa0
    CONT1 = 0xa1
# [...]

所有 ASN.1对象都由简单的 Python 实例表示,这些实例充当原始值的简单外壳。简单的逻辑由 ASN1_Object 处理,因此它们非常简单:

1
2
3
4
5
6
7
8
class ASN1_INTEGER(ASN1_Object):
    tag = ASN1_Class_UNIVERSAL.INTEGER

class ASN1_STRING(ASN1_Object):
    tag = ASN1_Class_UNIVERSAL.STRING

class ASN1_BIT_STRING(ASN1_STRING):
    tag = ASN1_Class_UNIVERSAL.BIT_STRING

这些实例可以组装成 ASN.1树:

1
2
3
4
5
6
7
8
9
x=ASN1_SEQUENCE([ASN1_INTEGER(7),ASN1_STRING("egg"),ASN1_SEQUENCE([ASN1_BOOLEAN(False)])])
x
<ASN1_SEQUENCE[[<ASN1_INTEGER[7]>, <ASN1_STRING['egg']>, <ASN1_SEQUENCE[[<ASN1_BOOLEAN[False]>]]>]]>
x.show()
# ASN1_SEQUENCE:
  <ASN1_INTEGER[7]>
  <ASN1_STRING['egg']>
  # ASN1_SEQUENCE:
    <ASN1_BOOLEAN[False]>

Encoding engines

与标准一样,ASN.1和编码是独立的。我们刚刚看到了如何创建复合 ASN.1对象。要对其进行编码或解码,我们需要选择一个编码规则。Scapy 目前只提供 BER (实际上,可能是 DER。DER 和 BER相比只有最小的编码被授权),这称为ASN.1编解码器

编码和解码是使用编解码器提供的类方法完成的。例如,BERcodec_INTEGER 类提供了 Enc ()和 Dec ()类方法,可以在编码的字符串和其类型的值之间进行转换。它们都继承自 BERcodec_Object,它能够从任何类型解码对象:

1
2
3
4
5
BERcodec_INTEGER.enc(7)
BERcodec_BIT_STRING.enc("egg")
BERcodec_STRING.enc("egg")
BERcodec_STRING.dec('\x04\x03egg')
BERcodec_Object.dec('\x03\x03egg')

ASN.1对象使用它们的 enc ()方法进行编码。必须使用我们想要使用的编解码器调用此方法。在 ASN1_Codecs 对象中引用所有 codecs。Raw ()也可以使用。在这种情况下,将使用缺省的 codec (config.ASN1_default_codec)。

1
2
3
4
x.enc(ASN1_Codecs.BER)
raw(x)
xx,remain = BERcodec_Object.dec(_)
xx.show()

默认情况下,解码使用 Universal 类,这意味着在 Context 类中定义的对象将不会被解码。这里有一个很好的理由: 解码取决于上下文!

必须指定 Context 类:

1
(dcert,remain) = BERcodec_Object.dec(cert, context=ASN1_Class_X509)

ASN.1 layers

ASN.1 fields

Scapy 提供 ASN.1字段。它们将包装 ASN.1对象,并提供必要的逻辑来将字段名绑定到值。ASN.1数据包将被描述为 ASN.1字段的树。然后,每个字段名将作为一个普通的 Packet 对象提供,并且是flat flavor平面的(例如: 要访问 SNMP 数据包的 version 字段,您不需要知道包装它的容器数量)

每个 ASN.1字段通过其标记链接到一个 ASN.1对象。

ASN.1 packets

ASN.1数据包从 Packet 类继承。不是字段的 fields_desc 列表,它们定义 ASN1_codec 和 ASN1_root 属性。第一个是编解码器(例如: ASN1_Codecs. BER) ,第二个是与 ASN.1字段复合的树。

一个完整的示例: SNMP

SNMP 定义了新的 ASN.1对象。我们需要定义它们:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class ASN1_Class_SNMP(ASN1_Class_UNIVERSAL):
    name="SNMP"
    PDU_GET = 0xa0
    PDU_NEXT = 0xa1
    PDU_RESPONSE = 0xa2
    PDU_SET = 0xa3
    PDU_TRAPv1 = 0xa4
    PDU_BULK = 0xa5
    PDU_INFORM = 0xa6
    PDU_TRAPv2 = 0xa7

这些对象是 PDU,实际上是 Vector 的新名称(上下文对象通常是这种情况: 它们是具有新名称的旧容器)。这意味着创建相应的 ASN.1对象和 BER 编解码器非常简单:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class ASN1_SNMP_PDU_GET(ASN1_SEQUENCE):
    tag = ASN1_Class_SNMP.PDU_GET

class ASN1_SNMP_PDU_NEXT(ASN1_SEQUENCE):
    tag = ASN1_Class_SNMP.PDU_NEXT

# [...]

class BERcodec_SNMP_PDU_GET(BERcodec_SEQUENCE):
    tag = ASN1_Class_SNMP.PDU_GET

class BERcodec_SNMP_PDU_NEXT(BERcodec_SEQUENCE):
    tag = ASN1_Class_SNMP.PDU_NEXT

元类提供了这样一个神奇的事实: 所有东西都是自动注册的,ASN.1对象和 BER 编解码器可以找到彼此。

ASN.1字段也是一样的:

1
2
3
4
5
class ASN1F_SNMP_PDU_GET(ASN1F_SEQUENCE):
    ASN1_tag = ASN1_Class_SNMP.PDU_GET

class ASN1F_SNMP_PDU_NEXT(ASN1F_SEQUENCE):
    ASN1_tag = ASN1_Class_SNMP.PDU_NEXT

现在,最难的部分,ASN.1数据包:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
SNMP_error = { 0: "no_error",
               1: "too_big",
# [...]
             }

SNMP_trap_types = { 0: "cold_start",
                    1: "warm_start",
# [...]
                  }

class SNMPvarbind(ASN1_Packet):
    ASN1_codec = ASN1_Codecs.BER
    ASN1_root = ASN1F_SEQUENCE( ASN1F_OID("oid","1.3"),
                                ASN1F_field("value",ASN1_NULL(0))
                                )


class SNMPget(ASN1_Packet):
    ASN1_codec = ASN1_Codecs.BER
    ASN1_root = ASN1F_SNMP_PDU_GET( ASN1F_INTEGER("id",0),
                                    ASN1F_enum_INTEGER("error",0, SNMP_error),
                                    ASN1F_INTEGER("error_index",0),
                                    ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind)
                                    )

class SNMPnext(ASN1_Packet):
    ASN1_codec = ASN1_Codecs.BER
    ASN1_root = ASN1F_SNMP_PDU_NEXT( ASN1F_INTEGER("id",0),
                                     ASN1F_enum_INTEGER("error",0, SNMP_error),
                                     ASN1F_INTEGER("error_index",0),
                                     ASN1F_SEQUENCE_OF("varbindlist", [], SNMPvarbind)
                                     )
# [...]

class SNMP(ASN1_Packet):
    ASN1_codec = ASN1_Codecs.BER
    ASN1_root = ASN1F_SEQUENCE(
        ASN1F_enum_INTEGER("version", 1, {0:"v1", 1:"v2c", 2:"v2", 3:"v3"}),
        ASN1F_STRING("community","public"),
        ASN1F_CHOICE("PDU", SNMPget(),
                     SNMPget, SNMPnext, SNMPresponse, SNMPset,
                     SNMPtrapv1, SNMPbulk, SNMPinform, SNMPtrapv2)
        )
    def answers(self, other):
        return ( isinstance(self.PDU, SNMPresponse)    and
                 ( isinstance(other.PDU, SNMPget) or
                   isinstance(other.PDU, SNMPnext) or
                   isinstance(other.PDU, SNMPset)    ) and
                 self.PDU.id == other.PDU.id )
# [...]
bind_layers( UDP, SNMP, sport=161)
bind_layers( UDP, SNMP, dport=161)

现在,如何使用它? 像往常一样:

1
2
3
4
a=SNMP(version=3, PDU=SNMPget(varbindlist=[SNMPvarbind(oid="1.2.3",value=5),SNMPvarbind(oid="3.2.1",value="hello")]))
a.show()
hexdump(a)
SNMP(raw(a)).show()
 |