network.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015-2020
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. from distutils import spawn
  22. import asyncio
  23. import subprocess
  24. import sys
  25. import time
  26. import unittest
  27. import qubes.tests
  28. import qubes.firewall
  29. import qubes.vm.qubesvm
  30. import qubes.vm.appvm
  31. # noinspection PyAttributeOutsideInit,PyPep8Naming
  32. class VmNetworkingMixin(object):
  33. test_ip = '192.168.123.45'
  34. test_name = 'test.example.com'
  35. ping_cmd = 'ping -W 1 -n -c 1 {target}'
  36. ping_ip = ping_cmd.format(target=test_ip)
  37. ping_name = ping_cmd.format(target=test_name)
  38. # filled by load_tests
  39. template = None
  40. def run_cmd(self, vm, cmd, user="root"):
  41. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  42. :type self: qubes.tests.SystemTestCase | VmNetworkingMixin
  43. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  44. :param str cmd: command to execute
  45. :param std user: user to execute command as
  46. :return int: command exit code
  47. '''
  48. try:
  49. self.loop.run_until_complete(vm.run_for_stdio(cmd, user=user))
  50. except subprocess.CalledProcessError as e:
  51. return e.returncode
  52. return 0
  53. def setUp(self):
  54. '''
  55. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  56. '''
  57. super(VmNetworkingMixin, self).setUp()
  58. if self.template.startswith('whonix-'):
  59. self.skipTest("Test not supported here - Whonix uses its own "
  60. "firewall settings")
  61. if self.template.endswith('-minimal'):
  62. self.skipTest(
  63. "Test not supported here - minimal template don't have "
  64. "networking packages by default")
  65. self.init_default_template(self.template)
  66. self.testnetvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  67. name=self.make_vm_name('netvm1'),
  68. label='red')
  69. self.loop.run_until_complete(self.testnetvm.create_on_disk())
  70. self.testnetvm.provides_network = True
  71. self.testnetvm.netvm = None
  72. self.testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  73. name=self.make_vm_name('vm1'),
  74. label='red')
  75. self.loop.run_until_complete(self.testvm1.create_on_disk())
  76. self.testvm1.netvm = self.testnetvm
  77. self.app.save()
  78. self.configure_netvm()
  79. def configure_netvm(self):
  80. '''
  81. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  82. '''
  83. def run_netvm_cmd(cmd):
  84. try:
  85. self.loop.run_until_complete(
  86. self.testnetvm.run_for_stdio(cmd, user='root'))
  87. except subprocess.CalledProcessError as e:
  88. self.fail("Command '%s' failed: %s%s" %
  89. (cmd, e.stdout.decode(), e.stderr.decode()))
  90. if not self.testnetvm.is_running():
  91. self.loop.run_until_complete(self.testnetvm.start())
  92. # Ensure that dnsmasq is installed:
  93. try:
  94. self.loop.run_until_complete(self.testnetvm.run_for_stdio(
  95. 'dnsmasq --version', user='root'))
  96. except subprocess.CalledProcessError:
  97. self.skipTest("dnsmasq not installed")
  98. run_netvm_cmd("ip link add test0 type dummy")
  99. run_netvm_cmd("ip link set test0 up")
  100. run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
  101. run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT --wait".format(
  102. self.test_ip))
  103. # ignore failure
  104. self.run_cmd(self.testnetvm, "pkill dnsmasq")
  105. run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
  106. ip=self.test_ip, name=self.test_name))
  107. run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
  108. self.test_ip))
  109. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  110. def test_000_simple_networking(self):
  111. '''
  112. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  113. '''
  114. self.loop.run_until_complete(self.testvm1.start())
  115. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  116. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  117. def test_010_simple_proxyvm(self):
  118. '''
  119. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  120. '''
  121. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  122. name=self.make_vm_name('proxy'),
  123. label='red')
  124. self.proxy.provides_network = True
  125. self.proxy.netvm = self.testnetvm
  126. self.loop.run_until_complete(self.proxy.create_on_disk())
  127. self.testvm1.netvm = self.proxy
  128. self.app.save()
  129. self.loop.run_until_complete(self.testvm1.start())
  130. self.assertTrue(self.proxy.is_running())
  131. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  132. "Ping by IP from ProxyVM failed")
  133. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  134. "Ping by name from ProxyVM failed")
  135. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  136. "Ping by IP from AppVM failed")
  137. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  138. "Ping by IP from AppVM failed")
  139. @qubes.tests.expectedFailureIfTemplate('debian-7')
  140. @unittest.skipUnless(spawn.find_executable('xdotool'),
  141. "xdotool not installed")
  142. def test_020_simple_proxyvm_nm(self):
  143. '''
  144. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  145. '''
  146. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  147. name=self.make_vm_name('proxy'),
  148. label='red')
  149. self.proxy.provides_network = True
  150. self.loop.run_until_complete(self.proxy.create_on_disk())
  151. self.proxy.netvm = self.testnetvm
  152. self.proxy.features['service.network-manager'] = True
  153. self.testvm1.netvm = self.proxy
  154. self.app.save()
  155. self.loop.run_until_complete(self.testvm1.start())
  156. self.assertTrue(self.proxy.is_running())
  157. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  158. "Ping by IP failed")
  159. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  160. "Ping by name failed")
  161. # reconnect to make sure that device was configured by NM
  162. self.assertEqual(
  163. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  164. user="user"),
  165. 0, "Failed to disconnect eth0 using nmcli")
  166. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  167. "Network should be disabled, but apparently it isn't")
  168. self.assertEqual(
  169. self.run_cmd(self.proxy,
  170. 'nmcli connection up "VM uplink eth0" ifname eth0',
  171. user="user"),
  172. 0, "Failed to connect eth0 using nmcli")
  173. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  174. "Failed to wait for NM connection")
  175. # check for nm-applet presence
  176. self.assertEqual(subprocess.call([
  177. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  178. self.proxy.name)],
  179. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  180. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  181. "Ping by IP failed (after NM reconnection")
  182. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  183. "Ping by name failed (after NM reconnection)")
  184. def test_030_firewallvm_firewall(self):
  185. '''
  186. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  187. '''
  188. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  189. name=self.make_vm_name('proxy'),
  190. label='red')
  191. self.proxy.provides_network = True
  192. self.loop.run_until_complete(self.proxy.create_on_disk())
  193. self.proxy.netvm = self.testnetvm
  194. self.testvm1.netvm = self.proxy
  195. self.app.save()
  196. # block all for first
  197. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  198. self.testvm1.firewall.save()
  199. self.loop.run_until_complete(self.testvm1.start())
  200. self.assertTrue(self.proxy.is_running())
  201. server = self.loop.run_until_complete(self.testnetvm.run(
  202. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  203. try:
  204. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  205. "Ping by IP from ProxyVM failed")
  206. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  207. "Ping by name from ProxyVM failed")
  208. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  209. "Ping by IP should be blocked")
  210. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  211. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  212. "TCP connection should be blocked")
  213. # block all except ICMP
  214. self.testvm1.firewall.rules = [(
  215. qubes.firewall.Rule(None, action='accept', proto='icmp')
  216. )]
  217. self.testvm1.firewall.save()
  218. # Ugly hack b/c there is no feedback when the rules are actually
  219. # applied
  220. time.sleep(3)
  221. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  222. "Ping by IP failed (should be allowed now)")
  223. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  224. "Ping by name should be blocked")
  225. # all TCP still blocked
  226. self.testvm1.firewall.rules = [
  227. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  228. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  229. ]
  230. self.testvm1.firewall.save()
  231. # Ugly hack b/c there is no feedback when the rules are actually
  232. # applied
  233. time.sleep(3)
  234. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  235. "Ping by name failed (should be allowed now)")
  236. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  237. "TCP connection should be blocked")
  238. # block all except target
  239. self.testvm1.firewall.rules = [
  240. qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
  241. proto='tcp', dstports=1234),
  242. ]
  243. self.testvm1.firewall.save()
  244. # Ugly hack b/c there is no feedback when the rules are actually
  245. # applied
  246. time.sleep(3)
  247. self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  248. "TCP connection failed (should be allowed now)")
  249. # allow all except target
  250. self.testvm1.firewall.rules = [
  251. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
  252. proto='tcp', dstports=1234),
  253. qubes.firewall.Rule(action='accept'),
  254. ]
  255. self.testvm1.firewall.save()
  256. # Ugly hack b/c there is no feedback when the rules are actually
  257. # applied
  258. time.sleep(3)
  259. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  260. "TCP connection should be blocked")
  261. finally:
  262. server.terminate()
  263. self.loop.run_until_complete(server.wait())
  264. def test_040_inter_vm(self):
  265. '''
  266. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  267. '''
  268. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  269. name=self.make_vm_name('proxy'),
  270. label='red')
  271. self.loop.run_until_complete(self.proxy.create_on_disk())
  272. self.proxy.provides_network = True
  273. self.proxy.netvm = self.testnetvm
  274. self.testvm1.netvm = self.proxy
  275. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  276. name=self.make_vm_name('vm2'),
  277. label='red')
  278. self.loop.run_until_complete(self.testvm2.create_on_disk())
  279. self.testvm2.netvm = self.proxy
  280. self.app.save()
  281. self.loop.run_until_complete(asyncio.wait([
  282. self.testvm1.start(),
  283. self.testvm2.start()]))
  284. self.assertNotEqual(self.run_cmd(self.testvm1,
  285. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  286. self.testvm2.netvm = self.testnetvm
  287. self.assertNotEqual(self.run_cmd(self.testvm1,
  288. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  289. self.assertNotEqual(self.run_cmd(self.testvm2,
  290. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  291. self.testvm1.netvm = self.testnetvm
  292. self.assertNotEqual(self.run_cmd(self.testvm1,
  293. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  294. self.assertNotEqual(self.run_cmd(self.testvm2,
  295. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  296. def test_050_spoof_ip(self):
  297. '''Test if VM IP spoofing is blocked
  298. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  299. '''
  300. self.loop.run_until_complete(self.testvm1.start())
  301. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  302. self.assertEqual(self.run_cmd(self.testnetvm,
  303. 'iptables -I INPUT -i vif+ ! -s {} -p icmp -j LOG'.format(
  304. self.testvm1.ip)), 0)
  305. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  306. 'ip addr flush dev eth0 && '
  307. 'ip addr add 10.137.1.128/24 dev eth0 && '
  308. 'ip route add default dev eth0',
  309. user='root'))
  310. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  311. "Spoofed ping should be blocked")
  312. try:
  313. (output, _) = self.loop.run_until_complete(
  314. self.testnetvm.run_for_stdio('iptables -nxvL INPUT',
  315. user='root'))
  316. except subprocess.CalledProcessError:
  317. self.fail('iptables -nxvL INPUT failed')
  318. output = output.decode().splitlines()
  319. packets = output[2].lstrip().split()[0]
  320. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  321. def test_100_late_xldevd_startup(self):
  322. '''Regression test for #1990
  323. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  324. '''
  325. # Simulater late xl devd startup
  326. cmd = "systemctl stop xendriverdomain"
  327. if self.run_cmd(self.testnetvm, cmd) != 0:
  328. self.fail("Command '%s' failed" % cmd)
  329. self.loop.run_until_complete(self.testvm1.start())
  330. cmd = "systemctl start xendriverdomain"
  331. if self.run_cmd(self.testnetvm, cmd) != 0:
  332. self.fail("Command '%s' failed" % cmd)
  333. # let it initialize the interface(s)
  334. time.sleep(1)
  335. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  336. def test_110_dynamic_attach(self):
  337. self.testvm1.netvm = None
  338. self.loop.run_until_complete(self.testvm1.start())
  339. self.testvm1.netvm = self.testnetvm
  340. # wait for it to settle down
  341. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  342. 'udevadm settle'))
  343. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  344. def test_111_dynamic_detach_attach(self):
  345. self.loop.run_until_complete(self.testvm1.start())
  346. self.testvm1.netvm = None
  347. # wait for it to settle down
  348. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  349. 'udevadm settle'))
  350. self.testvm1.netvm = self.testnetvm
  351. # wait for it to settle down
  352. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  353. 'udevadm settle'))
  354. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  355. def test_112_reattach_after_provider_shutdown(self):
  356. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  357. name=self.make_vm_name('proxy'),
  358. label='red')
  359. self.proxy.provides_network = True
  360. self.proxy.netvm = self.testnetvm
  361. self.loop.run_until_complete(self.proxy.create_on_disk())
  362. self.testvm1.netvm = self.proxy
  363. self.loop.run_until_complete(self.testvm1.start())
  364. self.loop.run_until_complete(self.proxy.shutdown(force=True, wait=True))
  365. self.loop.run_until_complete(self.proxy.start())
  366. # wait for it to settle down
  367. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  368. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  369. def test_113_reattach_after_provider_kill(self):
  370. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  371. name=self.make_vm_name('proxy'),
  372. label='red')
  373. self.proxy.provides_network = True
  374. self.proxy.netvm = self.testnetvm
  375. self.loop.run_until_complete(self.proxy.create_on_disk())
  376. self.testvm1.netvm = self.proxy
  377. self.loop.run_until_complete(self.testvm1.start())
  378. self.loop.run_until_complete(self.proxy.kill())
  379. self.loop.run_until_complete(self.proxy.start())
  380. # wait for it to settle down
  381. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  382. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  383. def test_114_reattach_after_provider_crash(self):
  384. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  385. name=self.make_vm_name('proxy'),
  386. label='red')
  387. self.proxy.provides_network = True
  388. self.proxy.netvm = self.testnetvm
  389. self.loop.run_until_complete(self.proxy.create_on_disk())
  390. self.testvm1.netvm = self.proxy
  391. self.loop.run_until_complete(self.testvm1.start())
  392. p = self.loop.run_until_complete(self.proxy.run(
  393. 'echo c > /proc/sysrq-trigger', user='root'))
  394. self.loop.run_until_complete(p.wait())
  395. timeout = 10
  396. while self.proxy.is_running():
  397. self.loop.run_until_complete(asyncio.sleep(1))
  398. timeout -= 1
  399. self.assertGreater(timeout, 0,
  400. 'timeout waiting for crash cleanup')
  401. self.loop.run_until_complete(self.proxy.start())
  402. # wait for it to settle down
  403. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  404. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  405. def test_200_fake_ip_simple(self):
  406. '''Test hiding VM real IP
  407. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  408. '''
  409. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  410. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  411. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  412. self.app.save()
  413. self.loop.run_until_complete(self.testvm1.start())
  414. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  415. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  416. try:
  417. (output, _) = self.loop.run_until_complete(
  418. self.testvm1.run_for_stdio(
  419. 'ip addr show dev eth0', user='root'))
  420. except subprocess.CalledProcessError:
  421. self.fail('ip addr show dev eth0 failed')
  422. output = output.decode()
  423. self.assertIn('192.168.1.128', output)
  424. self.assertNotIn(str(self.testvm1.ip), output)
  425. try:
  426. (output, _) = self.loop.run_until_complete(
  427. self.testvm1.run_for_stdio('ip route show', user='root'))
  428. except subprocess.CalledProcessError:
  429. self.fail('ip route show failed')
  430. output = output.decode()
  431. self.assertIn('192.168.1.1', output)
  432. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  433. def test_201_fake_ip_without_gw(self):
  434. '''Test hiding VM real IP
  435. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  436. '''
  437. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  438. self.app.save()
  439. self.loop.run_until_complete(self.testvm1.start())
  440. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  441. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  442. try:
  443. (output, _) = self.loop.run_until_complete(
  444. self.testvm1.run_for_stdio('ip addr show dev eth0',
  445. user='root'))
  446. except subprocess.CalledProcessError:
  447. self.fail('ip addr show dev eth0 failed')
  448. output = output.decode()
  449. self.assertIn('192.168.1.128', output)
  450. self.assertNotIn(str(self.testvm1.ip), output)
  451. def test_202_fake_ip_firewall(self):
  452. '''Test hiding VM real IP, firewall
  453. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  454. '''
  455. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  456. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  457. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  458. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  459. name=self.make_vm_name('proxy'),
  460. label='red')
  461. self.proxy.provides_network = True
  462. self.loop.run_until_complete(self.proxy.create_on_disk())
  463. self.proxy.netvm = self.testnetvm
  464. self.testvm1.netvm = self.proxy
  465. self.app.save()
  466. # block all but ICMP and DNS
  467. self.testvm1.firewall.rules = [
  468. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  469. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  470. ]
  471. self.testvm1.firewall.save()
  472. self.loop.run_until_complete(self.testvm1.start())
  473. self.assertTrue(self.proxy.is_running())
  474. server = self.loop.run_until_complete(self.testnetvm.run(
  475. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  476. try:
  477. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  478. "Ping by IP from ProxyVM failed")
  479. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  480. "Ping by name from ProxyVM failed")
  481. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  482. "Ping by IP should be allowed")
  483. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  484. "Ping by name should be allowed")
  485. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  486. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  487. "TCP connection should be blocked")
  488. finally:
  489. server.terminate()
  490. self.loop.run_until_complete(server.wait())
  491. def test_203_fake_ip_inter_vm_allow(self):
  492. '''Access VM with "fake IP" from other VM (when firewall allows)
  493. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  494. '''
  495. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  496. name=self.make_vm_name('proxy'),
  497. label='red')
  498. self.loop.run_until_complete(self.proxy.create_on_disk())
  499. self.proxy.provides_network = True
  500. self.proxy.netvm = self.testnetvm
  501. self.testvm1.netvm = self.proxy
  502. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  503. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  504. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  505. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  506. name=self.make_vm_name('vm2'),
  507. label='red')
  508. self.loop.run_until_complete(self.testvm2.create_on_disk())
  509. self.testvm2.netvm = self.proxy
  510. self.app.save()
  511. self.loop.run_until_complete(self.testvm1.start())
  512. self.loop.run_until_complete(self.testvm2.start())
  513. cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
  514. self.testvm2.ip, self.testvm1.ip)
  515. try:
  516. self.loop.run_until_complete(self.proxy.run_for_stdio(
  517. cmd, user='root'))
  518. except subprocess.CalledProcessError as e:
  519. raise AssertionError(
  520. '{} failed with: {}'.format(cmd, e.returncode)) from None
  521. try:
  522. cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(self.testvm2.ip)
  523. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  524. cmd, user='root'))
  525. except subprocess.CalledProcessError as e:
  526. raise AssertionError(
  527. '{} failed with: {}'.format(cmd, e.returncode)) from None
  528. self.assertEqual(self.run_cmd(self.testvm2,
  529. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  530. try:
  531. cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
  532. (stdout, _) = self.loop.run_until_complete(
  533. self.testvm1.run_for_stdio(cmd, user='root'))
  534. except subprocess.CalledProcessError as e:
  535. raise AssertionError(
  536. '{} failed with {}'.format(cmd, e.returncode)) from None
  537. self.assertNotEqual(stdout.decode().split()[0], '0',
  538. 'Packets didn\'t managed to the VM')
  539. def test_204_fake_ip_proxy(self):
  540. '''Test hiding VM real IP
  541. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  542. '''
  543. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  544. name=self.make_vm_name('proxy'),
  545. label='red')
  546. self.loop.run_until_complete(self.proxy.create_on_disk())
  547. self.proxy.provides_network = True
  548. self.proxy.netvm = self.testnetvm
  549. self.proxy.features['net.fake-ip'] = '192.168.1.128'
  550. self.proxy.features['net.fake-gateway'] = '192.168.1.1'
  551. self.proxy.features['net.fake-netmask'] = '255.255.255.0'
  552. self.testvm1.netvm = self.proxy
  553. self.app.save()
  554. self.loop.run_until_complete(self.testvm1.start())
  555. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
  556. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
  557. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  558. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  559. try:
  560. (output, _) = self.loop.run_until_complete(
  561. self.proxy.run_for_stdio(
  562. 'ip addr show dev eth0', user='root'))
  563. except subprocess.CalledProcessError:
  564. self.fail('ip addr show dev eth0 failed')
  565. output = output.decode()
  566. self.assertIn('192.168.1.128', output)
  567. self.assertNotIn(str(self.testvm1.ip), output)
  568. try:
  569. (output, _) = self.loop.run_until_complete(
  570. self.proxy.run_for_stdio(
  571. 'ip route show', user='root'))
  572. except subprocess.CalledProcessError:
  573. self.fail('ip route show failed')
  574. output = output.decode()
  575. self.assertIn('192.168.1.1', output)
  576. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  577. try:
  578. (output, _) = self.loop.run_until_complete(
  579. self.testvm1.run_for_stdio(
  580. 'ip addr show dev eth0', user='root'))
  581. except subprocess.CalledProcessError:
  582. self.fail('ip addr show dev eth0 failed')
  583. output = output.decode()
  584. self.assertNotIn('192.168.1.128', output)
  585. self.assertIn(str(self.testvm1.ip), output)
  586. try:
  587. (output, _) = self.loop.run_until_complete(
  588. self.testvm1.run_for_stdio(
  589. 'ip route show', user='root'))
  590. except subprocess.CalledProcessError:
  591. self.fail('ip route show failed')
  592. output = output.decode()
  593. self.assertIn('192.168.1.128', output)
  594. self.assertNotIn(str(self.proxy.ip), output)
  595. def test_210_custom_ip_simple(self):
  596. '''Custom AppVM IP
  597. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  598. '''
  599. self.testvm1.ip = '192.168.1.1'
  600. self.app.save()
  601. self.loop.run_until_complete(self.testvm1.start())
  602. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  603. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  604. def test_211_custom_ip_proxy(self):
  605. '''Custom ProxyVM IP
  606. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  607. '''
  608. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  609. name=self.make_vm_name('proxy'),
  610. label='red')
  611. self.loop.run_until_complete(self.proxy.create_on_disk())
  612. self.proxy.provides_network = True
  613. self.proxy.netvm = self.testnetvm
  614. self.proxy.ip = '192.168.1.1'
  615. self.testvm1.netvm = self.proxy
  616. self.app.save()
  617. self.loop.run_until_complete(self.testvm1.start())
  618. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  619. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  620. def test_212_custom_ip_firewall(self):
  621. '''Custom VM IP and firewall
  622. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  623. '''
  624. self.testvm1.ip = '192.168.1.1'
  625. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  626. name=self.make_vm_name('proxy'),
  627. label='red')
  628. self.proxy.provides_network = True
  629. self.loop.run_until_complete(self.proxy.create_on_disk())
  630. self.proxy.netvm = self.testnetvm
  631. self.testvm1.netvm = self.proxy
  632. self.app.save()
  633. # block all but ICMP and DNS
  634. self.testvm1.firewall.rules = [
  635. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  636. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  637. ]
  638. self.testvm1.firewall.save()
  639. self.loop.run_until_complete(self.testvm1.start())
  640. self.assertTrue(self.proxy.is_running())
  641. server = self.loop.run_until_complete(self.testnetvm.run(
  642. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  643. try:
  644. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  645. "Ping by IP from ProxyVM failed")
  646. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  647. "Ping by name from ProxyVM failed")
  648. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  649. "Ping by IP should be allowed")
  650. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  651. "Ping by name should be allowed")
  652. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  653. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  654. "TCP connection should be blocked")
  655. finally:
  656. server.terminate()
  657. self.loop.run_until_complete(server.wait())
  658. def create_testcases_for_templates():
  659. yield from qubes.tests.create_testcases_for_templates('VmNetworking',
  660. VmNetworkingMixin, qubes.tests.SystemTestCase,
  661. module=sys.modules[__name__])
  662. def load_tests(loader, tests, pattern):
  663. tests.addTests(loader.loadTestsFromNames(
  664. create_testcases_for_templates()))
  665. return tests
  666. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)